Monday 8 March 2021

Dask: setting index on a big dataframe results in high disk space usage during processing

I am working with a large dataset (220 000 000 rows, ~25Gb as csv files) which is stored as a handful of csv files.

I have already managed to read these csv with Dask and save the data as a parquet file with the following:

import pandas as pd
from dask.distributed import Client
import dask.dataframe as dd
client = Client()

init_fields = {
# definition of csv fields
}

raw_data_paths = [
# filenames with their path
]

read_csv_kwargs = dict(
    sep=";",
    header=None,
    names=list(init_fields.keys()),      
    dtype=init_fields, 
    parse_dates=['date'],    
)

ddf = dd.read_csv(
    raw_data_paths,
    **read_csv_kwargs,
)
ddf.to_parquet(persist_path / 'raw_data.parquet')

It works like a charm, and completes within minutes. I get a parquet file holding a Dask Dataframe with 455 partitions which I can totally use.

However, this dataframe consists in a huge list of client orders, which I would like to index by date for further processing.

When I try to run the code with the adjustment below:

ddf = dd.read_csv(
    raw_data_paths,
    **read_csv_kwargs,
).set_index('date')
ddf.to_parquet(persist_path / 'raw_data.parquet')

the processing gets really long, with 26 000+ tasks (I can understand that, that's a lot of data to sort) but workers start dying after a while from using to much memory.

Dask dashboard

With each worker death, some progress is lost and it seems that the processing will never complete.

I have noticed that the workers deaths are related to the disk of my machine reaching its limit, and whenever a worker dies some space is freed. At the beginning of the processing, I have about 37 Gb of free disk space.

I am quite new to Dask, so have a few questions about that:

  • Is setting the index before dumping in a parquet file a good idea ? I have several groupby date to come for the next steps, and as per the Dask documentation using this field as index seemed to me to be a good idea.
  • If I manage to set the index before dumping as a parquet file, will the parquet file be sorted and my further processing require no more shuffling?
  • Does the above described behaviour (high disk usage into memory error) seem normal or is something odd in my setup or use of Dask? Are there some parameters that I could tweak?
  • Or I really need more disk space, because sorting so much data requires it? What would be an estimation of the total disk space required?

Thanks in advance!

EDIT: I finally managed to set the index by:

  • adding disk space on my machine
  • tweaking the client parameters to have more memory per worker

The parameters I used were:

client = Client(
    n_workers=1,
    threads_per_worker=8,
    processes=True,
    memory_limit='31GB'
)

I am less adamant that the disk usage was the root cause of my workers dying from lack of memory, because increasing disk space alone did not enable the processing to complete. It also required that memory per worker was extended, which I achieved by creating a single worker with the whole memory of my machine.

However, I am quite surprised that that much memory was needed. I thought that one of the aim of Dask (and other big data tools) was to enable "out of core processing". Am I doing something wrong here or setting an index requires a big amount of memory, no matter what?

Regards,



from Dask: setting index on a big dataframe results in high disk space usage during processing

No comments:

Post a Comment