Wednesday, 8 August 2018

Memory errors using xarray + dask - use groupby or apply_ufunc?

I am using xarray as the basis of my workflow for analysing fluid turbulence data, but I'm having trouble leveraging dask correctly to limit memory usage on my laptop.

I have a dataarray n with dimensions ('t', 'x', 'z'), which I've split into chunks of 5 along the z dimension:

<xarray.DataArray 'n' (t: 801, x: 960, z: 512)>
dask.array<shape=(801, 960, 512), dtype=float32, chunksize=(801, 960, 5)>
Coordinates:
* t              (t) int64 200 201 202 203 204 205 206 207 208 209 210 211 ...
* x              (x) int64 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ...
* z              (z) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ...  

I want to calculate the root mean squared fluctuations of n over t, and return a reduced dataarray with dimensions ('x', 'z'). I want to utilise dask to only do this operation on one chunk at a time, as I only have a couple of GB of RAM available on my laptop at any time.

I've written a generalized ufunc to calculate the root mean square of a dask array:

def rms_gufunc(x, axis):
    """Generalized ufunc to calculate root mean squared value of a 1d dask array."""
    squares = np.square(x)
    return np.sqrt(squares.mean(axis=-1))

but now I'm not sure what the best way to apply this is. As far as I can see I can either use (1) xarray.apply_ufunc, or (2) groupby.reduce.

1. Use apply_ufunc:

I can apply this function using xarray.apply_ufunc:

def rms(da, dim=None):
    """
    Reduces a dataarray by calculating the root mean square along dimension dim.
    """

    if dim is None:
        raise ValueError('Must supply a dimension along which to calculate rms')

    return xr.apply_ufunc(rms_gufunc, da,
                          input_core_dims=[[dim]],
                          dask='parallelized', output_dtypes=[da.dtype])

n_rms = rms(data['n'], dim='t')
n_rms.load()  # Trigger computation

which seems to work, but it seems more complex than necessary?

2. Use groupby.reduce:

The xarray docs seems to suggest that this is a "split-apply-combine" operation which I should be able to accomplish through something like

n_rms = data['n'].groupby('z').reduce(rms_gufunc, dim='t')

However this causes a MemoryError, and I'm pretty sure that's not what I want to achieve with the groupby step anyway. Am I supposed to be using groupby_bins to bin the data into the chunks I made along z?

I would like to know a) if I'm using apply_ufunc correctly, and b) how I would do the same thing using groupby.



from Memory errors using xarray + dask - use groupby or apply_ufunc?

No comments:

Post a Comment