I have a long list of .zarr arrays, that I would like to merge into a single array and write to disk.
My code approximately looks as follows:
import dask.array
import zarr
import os
local_paths = ['parts/X_00000000.zarr',
'parts/X_00000001.zarr',
'parts/X_00000002.zarr',
'parts/X_00000003.zarr',
'parts/X_00000004.zarr',
'parts/X_00000005.zarr',
...]
result_path = "testtest"
os.makedirs(result_path)
Xs = [dask.array.from_zarr(zarr.DirectoryStore(p)) for p in local_paths]
X = dask.array.concatenate(Xs, axis=0)
X = X.rechunk({0: 10000, 1:-1, 2:-1, 3:-1})
dask.array.to_zarr(X, zarr.DirectoryStore(result_path))
Each of the arrays from local_paths
contains a list of 64x64 images. These lists are all of different lengths. So the shape of the first might be (100, 64, 64, 3)
, the shape of the second might be (200, 64, 64, 3)
.
Executing the last line of this code, causes my memory to entirely consumed, and then the Jupyter notebook crashes entirely (without giving me an error message or an exception).
In order to investigate the problem, I printed the task graph and therefore replaced the last line with the following two lines:
k = dask.array.to_zarr(X, zarr.DirectoryStore(result_path), compute=False)
k.visualize()
It is very huge (link), so I screenshoted only two interesting pieces of it:
This structure repeats all the time. Dask takes the output of the concatination, redistributes the data then tries to store it. Notice the thick black bar that is the result of overlapping transitions.
Now look where these transitions come from:
Look at the create
node in the middle. I assume that this is the part in the graph there the zarr DirectoryStore is created. The predecessor of the create
node has a transition to all store nodes!
Here is my guess why dask runs out of memory. It tries to resolve all the rechunking-merging first, and by the time it should create the DirectoryStore, there is no memory left. And none of the store
nodes can be executed, because the create
node is a precondition to each of them.
Is my assumtion true? If yes, what can I do to force dask to create the DirectoryStore first? If not, what else could be the problem that I am running out of memory?
UPDATE When I am using dask.config.set(scheduler='single-threaded')
the creation of the DirectoryStore (create node) is not the problem. I just looked at the output directory and there are some files already written. So it has to be the task graph itself, that is too large.
from Storing Dask Array using Zarr Consumes Too Much Memory
No comments:
Post a Comment