I am applying dictionaries on two dask dataframes and then merge between them - this is done without compute()
.
Later, I use to_csv
which is the only point where my dataframes are being computed.
I want to be able to detect KeyErrors
and maintain a log of them - seperated logs for the two dataframes.
Is there a way to which dataframe is being currently computed?
The gist of my code is
def convert(identifier):
try:
return alias_dict[str(identifier)]
except KeyError as err:
if current_df_converting == 'pileup':
conversion_error_handler(err.args[0], 'pileup_log')
elif current_df_converting == 'lists':
conversion_error_handler(err.args[0], 'lists_log')
return identifier
pileup_df = dd.read_csv(pileup, sep='\t', header=None, quoting=csv.QUOTE_NONE, encoding='utf-8')
lists_df = dd.read_csv(lists, sep='\t', header=None, quoting=csv.QUOTE_NONE, encoding='utf-8')
pileup_df['identifier'] = pileup_df.identifier.map(convert, meta=('identifier', str))
lists_df['identifier'] = lists_df.identifier.map(convert, meta=('identifier', str))
intersection_df = pileup_df.merge(lists_df, on=['identifier', 'position'])
dd.to_csv(intersection_df, output, header=None, index=None, single_file=True, sep='\t')
from How to check which dataframe is being computed with dask
No comments:
Post a Comment