home / github

Menu
  • Search all tables
  • GraphQL API

issue_comments

Table actions
  • GraphQL API for issue_comments

2 rows where user = 6491058 sorted by updated_at descending

✎ View and edit SQL

This data as json, CSV (advanced)

Suggested facets: created_at (date), updated_at (date)

user 1

  • bjcosta · 2 ✖

issue 1

  • xarray groupby/map fails to parallelize 2

author_association 1

  • NONE 2
id html_url issue_url node_id user created_at updated_at ▲ author_association body reactions performed_via_github_app issue
585452791 https://github.com/pydata/xarray/issues/3762#issuecomment-585452791 https://api.github.com/repos/pydata/xarray/issues/3762 MDEyOklzc3VlQ29tbWVudDU4NTQ1Mjc5MQ== bjcosta 6491058 2020-02-12T22:34:56Z 2023-08-02T19:50:42Z NONE

Actually it looks like this example is relevant: https://xarray.pydata.org/en/stable/examples/apply_ufunc_vectorize_1d.html

Hi dcherian,

I had a look at the apply_ufunc() example you linked and have re-implemented my code. The example helped me understand apply_ufunc() usage better but is very different from my use case and I still am unable to parallelize using dask.

The key difference is apply_ufunc() as described in the docs and the example, applys a function to a vector of data of a single type (in the example case it is air temperature across the 3 dimensions lat,long,time).

Where as I need to apply an operation using heterogeneous data (depth_bins, lower_limit, upper_limit) over a single dimension (time) to produce a new array of depths over time (which is why I tried groupby/map initially).

Anyhow, I have an implementation using apply_ufunc() that works using xarray and numpy arrays with apply_ufunc(), but when I try to parallelize it using dask my ufunc is called with empty arrays by xarray and it fails.

I.e. You can see when running the code below it logs the following when entering the ufunc: args: (array([], shape=(0, 0), dtype=int32), array([], dtype=int32), array([], dtype=int32), array([], dtype=int32)), kwargs: {}

I was expecting this to be called once for each chunk with 1000 items for each array.

Have I done something wrong in this work-around for the groupby/map code?

Thanks, Brendon

```python import sys import math import logging import dask import xarray import numpy

logger = logging.getLogger('main')

if name == 'main': logging.basicConfig( stream=sys.stdout, format='%(asctime)s %(levelname)-8s %(message)s', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')

logger.info('Starting dask client')
client = dask.distributed.Client()

SIZE = 3000
SONAR_BINS = 2000
upper_limit = numpy.random.randint(0, 10, (SIZE))
lower_limit = numpy.random.randint(20, 30, (SIZE))
sonar_data = numpy.random.randint(0, 255, (SIZE, SONAR_BINS))

time = range(0, SIZE)

channel = xarray.Dataset({
        'upper_limit': (['time'], upper_limit, {'units': 'depth meters'}),
        'lower_limit': (['time'],  lower_limit, {'units': 'depth meters'}),
        'data': (['time', 'depth_bin'], sonar_data, {'units': 'amplitude'}),
    },
    coords={
        'time': (['time'], time),
        'depth_bin': (['depth_bin'], range(0,SONAR_BINS)),
    })

logger.info('get overall min/max radar range we want to normalize to called the adjusted range')
adjusted_min, adjusted_max = channel.upper_limit.min().values.item(), channel.lower_limit.max().values.item()
adjusted_min = math.floor(adjusted_min)
adjusted_max = math.ceil(adjusted_max)
logger.info('adjusted_min: %s, adjusted_max: %s', adjusted_min, adjusted_max)

bin_count = len(channel.depth_bin)
logger.info('bin_count: %s', bin_count)

adjusted_depth_per_bin = (adjusted_max - adjusted_min) / bin_count
logger.info('adjusted_depth_per_bin: %s', adjusted_depth_per_bin)

adjusted_bin_depths = [adjusted_min + (j * adjusted_depth_per_bin) for j in range(0, bin_count)]
logger.info('adjusted_bin_depths[0]: %s ... [-1]: %s', adjusted_bin_depths[0], adjusted_bin_depths[-1])

def InterpSingle(unadjusted_depth_amplitudes, unadjusted_min, unadjusted_max, time):

    if (time % 1000) == 0:
        total = len(channel.time)
        perc = 100.0 * time / total
        logger.info('%s : %s of %s', perc, time, total)

    unadjusted_depth_per_bin = (unadjusted_max - unadjusted_min) / bin_count

    min_index = (adjusted_min - unadjusted_min) / unadjusted_depth_per_bin
    max_index = ((adjusted_min + ((bin_count - 1) * adjusted_depth_per_bin)) - unadjusted_min) / unadjusted_depth_per_bin
    index_mapping = numpy.linspace(min_index, max_index, bin_count)
    adjusted_depth_amplitudes = numpy.interp(index_mapping, range(0, len(unadjusted_depth_amplitudes)), unadjusted_depth_amplitudes, left=0, right=0)
    return adjusted_depth_amplitudes

def Interp(*args, **kwargs):
    logger.info('args: %s, kwargs: %s', args, kwargs)

    data = args[0]
    upper_limit = args[1]
    lower_limit = args[2]
    time = args[3]
    #logger.info('data: %s len(data[0]): %s data.shape: %s', data, len(data[0]), data.shape)

    adjusted = []
    for i in range(0, len(upper_limit)):
        d = data[i]
        u = upper_limit[i]
        l = lower_limit[i]
        t = time[i]
        result = InterpSingle(d, u, l, t)
        adjusted.append(result)

    #logger.info('adjusted: %s', adjusted)
    return adjusted



channel = channel.chunk({'time':1000}) # Comment this line out to disable dask
#logger.info('Channel: %s', channel)
#logger.info('shape of data: %s len(data[0]): %s', channel.data.shape, len(channel.data[0]))
m2_lazy = xarray.apply_ufunc(
    Interp, 
    channel.data, 
    channel.upper_limit, 
    channel.lower_limit, 
    channel.time, 
    input_core_dims=[['depth_bin'], [], [], []], 
    output_core_dims=[['depth']],
    dask='parallelized',   # Comment this line out to disable dask
    output_dtypes=[numpy.dtype(numpy.int32)],   # Comment this line out to disable dask
    output_sizes={'depth':len(adjusted_bin_depths)},   # Comment this line out to disable dask
    )
m2 = m2_lazy.compute() # Comment this line out to disable dask
m2 = m2.assign_coords({'depth':adjusted_bin_depths})

logger.info('Closing dask client')
client.close()

logger.info('Exit process')

```

{
    "total_count": 0,
    "+1": 0,
    "-1": 0,
    "laugh": 0,
    "hooray": 0,
    "confused": 0,
    "heart": 0,
    "rocket": 0,
    "eyes": 0
}
  xarray groupby/map fails to parallelize 561921094
583775471 https://github.com/pydata/xarray/issues/3762#issuecomment-583775471 https://api.github.com/repos/pydata/xarray/issues/3762 MDEyOklzc3VlQ29tbWVudDU4Mzc3NTQ3MQ== bjcosta 6491058 2020-02-08T20:47:23Z 2020-02-08T20:47:23Z NONE

channel = channel.chunk({"time": 1, "depth_bin":-1})

Thanks for the really quick response. I tried chunking and that is when the performance gets worse (from 5msec per interp to 70msec). I just added the exact chunking you suggested and tested again as well. It is slower but it also only seems to be using 10% of one CPU.

I will have to wait until tomorrow to have a try of the ufunc method. It sounds like it might be a workaround so thanks for that suggestion. I still think there is a bug in this case (either in my code or in xarray).

{
    "total_count": 0,
    "+1": 0,
    "-1": 0,
    "laugh": 0,
    "hooray": 0,
    "confused": 0,
    "heart": 0,
    "rocket": 0,
    "eyes": 0
}
  xarray groupby/map fails to parallelize 561921094

Advanced export

JSON shape: default, array, newline-delimited, object

CSV options:

CREATE TABLE [issue_comments] (
   [html_url] TEXT,
   [issue_url] TEXT,
   [id] INTEGER PRIMARY KEY,
   [node_id] TEXT,
   [user] INTEGER REFERENCES [users]([id]),
   [created_at] TEXT,
   [updated_at] TEXT,
   [author_association] TEXT,
   [body] TEXT,
   [reactions] TEXT,
   [performed_via_github_app] TEXT,
   [issue] INTEGER REFERENCES [issues]([id])
);
CREATE INDEX [idx_issue_comments_issue]
    ON [issue_comments] ([issue]);
CREATE INDEX [idx_issue_comments_user]
    ON [issue_comments] ([user]);
Powered by Datasette · Queries took 5033.784ms · About: xarray-datasette