pull_requests: 413847754
This data as json
id | node_id | number | state | locked | title | user | body | created_at | updated_at | closed_at | merged_at | merge_commit_sha | assignee | milestone | draft | head | base | author_association | auto_merge | repo | url | merged_by |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
413847754 | MDExOlB1bGxSZXF1ZXN0NDEzODQ3NzU0 | 4035 | closed | 0 | Support parallel writes to regions of zarr stores | 1217238 | This PR adds support for a `region` keyword argument to `to_zarr()`, to support parallel writes to different parts of arrays in a zarr stores, e.g., `ds.to_zarr(..., region={'x': slice(1000, 2000)})` to write a dataset over the range `1000:2000` along the `x` dimension. This is useful for creating large Zarr datasets _without_ requiring dask. For example, the separate workers in a simulation job might each write a single non-overlapping chunk of a Zarr file. The standard way to handle such datasets today is to first write netCDF files in each process, and then consolidate them afterwards with dask (see #3096). ### Creating empty Zarr stores In order to do so, the Zarr file must be pre-existing with desired variables in the right shapes/chunks. It is desirable to be able to create such stores without actually writing data, because datasets that we want to write in parallel may be very large. In the example below, I achieve this filling a `Dataset` with dask arrays, and passing `compute=False` to `to_zarr()`. This works, but it relies on an undocumented implementation detail of the `compute` argument. We should either: 1. Officially document that the `compute` argument only controls writing array values, not metadata (at least for zarr). 2. Add a new keyword argument or entire new method for creating an unfilled Zarr store, e.g., `write_values=False`. I think (1) is maybe the cleanest option (no extra API endpoints). ### Unchunked variables One potential gotcha concerns coordinate arrays that are not chunked, e.g., consider parallel writing of a dataset divided along time with 2D `latitude` and `longitude` arrays that are fixed over all chunks. With the current PR, such coordinate arrays would get rewritten by each separate writer. If a Zarr store does not have atomic writes, then conceivably this could result in corrupted data. The default DirectoryStore has atomic writes and cloud based object stores should also be atomic, so perhaps this doesn't matter in practice, but at the very least it's inefficient and could cause issues for large-scale jobs due to resource contention. Options include: 1. Current behavior. Variables whose dimensions do not overlap with `region` are written by `to_zarr()`. *This is likely the most intuitive behavior for writing from a single process at a time.* 2. Exclude variables whose dimensions do not overlap with `region` from being written. This is likely the most convenient behavior for writing from multiple processes at once. 3. Like (2), but issue a warning if any such variables exist instead of silently dropping them. 4. Like (2), but raise an error instead of a warning. Require the user to explicitly drop them with `.drop()`. This is probably the safest behavior. I think (4) would be my preferred option. Some users would undoubtedly find this annoying, but the power-users for whom we are adding this feature would likely appreciate it. ### Usage example ```python import xarray import dask.array as da ds = xarray.Dataset({'u': (('x',), da.arange(1000, chunks=100))}) # create the new zarr store, but don't write data path = 'my-data.zarr' ds.to_zarr(path, compute=False) # look at the unwritten data ds_opened = xarray.open_zarr(path) print('Data before writing:', ds_opened.u.data[::100].compute()) # Data before writing: [ 1 100 1 100 100 1 1 1 1 1] # write out each slice (could be in separate processes) for start in range(0, 1000, 100): selection = {'x': slice(start, start + 100)} ds.isel(selection).to_zarr(path, region=selection) print('Data after writing:', ds_opened.u.data[::100].compute()) # Data after writing: [ 0 100 200 300 400 500 600 700 800 900] ``` - [x] Closes https://github.com/pydata/xarray/issues/3096 - [x] Integration test - [x] Unit tests - [x] Passes `isort -rc . && black . && mypy . && flake8` - [x] Fully documented, including `whats-new.rst` for all changes and `api.rst` for new API | 2020-05-06T02:40:19Z | 2020-11-04T06:19:01Z | 2020-11-04T06:19:01Z | 2020-11-04T06:19:01Z | dd806b87bb91d98e920e459419782bb39c93cc2a | 0 | 8c8578af6a015a66f347537dc3341478fca96744 | 83884a1c6dac4b5f6309dfea530414facc100bc8 | MEMBER | 13221727 | https://github.com/pydata/xarray/pull/4035 |
Links from other tables
- 1 row from pull_requests_id in labels_pull_requests