Tuesday, 29 December 2020

How to check which dataframe is being computed with dask

I am applying dictionaries on two dask dataframes and then merge between them - this is done without compute().
Later, I use to_csv which is the only point where my dataframes are being computed.
I want to be able to detect KeyErrors and maintain a log of them - seperated logs for the two dataframes.

Is there a way to which dataframe is being currently computed?

The gist of my code is

def convert(identifier):
    try:
        return alias_dict[str(identifier)]
    except KeyError as err:
        if current_df_converting == 'pileup':
            conversion_error_handler(err.args[0], 'pileup_log')
        elif current_df_converting == 'lists':
            conversion_error_handler(err.args[0], 'lists_log')
        return identifier


pileup_df = dd.read_csv(pileup, sep='\t', header=None, quoting=csv.QUOTE_NONE, encoding='utf-8')
lists_df = dd.read_csv(lists, sep='\t', header=None, quoting=csv.QUOTE_NONE, encoding='utf-8')

pileup_df['identifier'] = pileup_df.identifier.map(convert, meta=('identifier', str))
lists_df['identifier'] = lists_df.identifier.map(convert, meta=('identifier', str))

intersection_df = pileup_df.merge(lists_df, on=['identifier', 'position'])

dd.to_csv(intersection_df, output, header=None, index=None, single_file=True, sep='\t')


from How to check which dataframe is being computed with dask

No comments:

Post a Comment