home / github / issues

Menu
  • GraphQL API
  • Search all tables

issues: 613012939

This data as json

id node_id number title user state locked assignee milestone comments created_at updated_at closed_at author_association active_lock_reason draft pull_request body reactions performed_via_github_app state_reason repo type
613012939 MDExOlB1bGxSZXF1ZXN0NDEzODQ3NzU0 4035 Support parallel writes to regions of zarr stores 1217238 closed 0     17 2020-05-06T02:40:19Z 2020-11-04T06:19:01Z 2020-11-04T06:19:01Z MEMBER   0 pydata/xarray/pulls/4035

This PR adds support for a region keyword argument to to_zarr(), to support parallel writes to different parts of arrays in a zarr stores, e.g., ds.to_zarr(..., region={'x': slice(1000, 2000)}) to write a dataset over the range 1000:2000 along the x dimension.

This is useful for creating large Zarr datasets without requiring dask. For example, the separate workers in a simulation job might each write a single non-overlapping chunk of a Zarr file. The standard way to handle such datasets today is to first write netCDF files in each process, and then consolidate them afterwards with dask (see #3096).

Creating empty Zarr stores

In order to do so, the Zarr file must be pre-existing with desired variables in the right shapes/chunks. It is desirable to be able to create such stores without actually writing data, because datasets that we want to write in parallel may be very large.

In the example below, I achieve this filling a Dataset with dask arrays, and passing compute=False to to_zarr(). This works, but it relies on an undocumented implementation detail of the compute argument. We should either:

  1. Officially document that the compute argument only controls writing array values, not metadata (at least for zarr).
  2. Add a new keyword argument or entire new method for creating an unfilled Zarr store, e.g., write_values=False.

I think (1) is maybe the cleanest option (no extra API endpoints).

Unchunked variables

One potential gotcha concerns coordinate arrays that are not chunked, e.g., consider parallel writing of a dataset divided along time with 2D latitude and longitude arrays that are fixed over all chunks. With the current PR, such coordinate arrays would get rewritten by each separate writer.

If a Zarr store does not have atomic writes, then conceivably this could result in corrupted data. The default DirectoryStore has atomic writes and cloud based object stores should also be atomic, so perhaps this doesn't matter in practice, but at the very least it's inefficient and could cause issues for large-scale jobs due to resource contention.

Options include:

  1. Current behavior. Variables whose dimensions do not overlap with region are written by to_zarr(). This is likely the most intuitive behavior for writing from a single process at a time.
  2. Exclude variables whose dimensions do not overlap with region from being written. This is likely the most convenient behavior for writing from multiple processes at once.
  3. Like (2), but issue a warning if any such variables exist instead of silently dropping them.
  4. Like (2), but raise an error instead of a warning. Require the user to explicitly drop them with .drop(). This is probably the safest behavior.

I think (4) would be my preferred option. Some users would undoubtedly find this annoying, but the power-users for whom we are adding this feature would likely appreciate it.

Usage example

```python import xarray import dask.array as da

ds = xarray.Dataset({'u': (('x',), da.arange(1000, chunks=100))})

create the new zarr store, but don't write data

path = 'my-data.zarr' ds.to_zarr(path, compute=False)

look at the unwritten data

ds_opened = xarray.open_zarr(path) print('Data before writing:', ds_opened.u.data[::100].compute())

Data before writing: [ 1 100 1 100 100 1 1 1 1 1]

write out each slice (could be in separate processes)

for start in range(0, 1000, 100): selection = {'x': slice(start, start + 100)} ds.isel(selection).to_zarr(path, region=selection)

print('Data after writing:', ds_opened.u.data[::100].compute())

Data after writing: [ 0 100 200 300 400 500 600 700 800 900]

```

  • [x] Closes https://github.com/pydata/xarray/issues/3096
  • [x] Integration test
  • [x] Unit tests
  • [x] Passes isort -rc . && black . && mypy . && flake8
  • [x] Fully documented, including whats-new.rst for all changes and api.rst for new API
{
    "url": "https://api.github.com/repos/pydata/xarray/issues/4035/reactions",
    "total_count": 4,
    "+1": 4,
    "-1": 0,
    "laugh": 0,
    "hooray": 0,
    "confused": 0,
    "heart": 0,
    "rocket": 0,
    "eyes": 0
}
    13221727 pull

Links from other tables

  • 1 row from issues_id in issues_labels
  • 17 rows from issue in issue_comments
Powered by Datasette · Queries took 158.4ms · About: xarray-datasette