home / github / issue_comments

Menu
  • Search all tables
  • GraphQL API

issue_comments: 585452791

This data as json

html_url issue_url id node_id user created_at updated_at author_association body reactions performed_via_github_app issue
https://github.com/pydata/xarray/issues/3762#issuecomment-585452791 https://api.github.com/repos/pydata/xarray/issues/3762 585452791 MDEyOklzc3VlQ29tbWVudDU4NTQ1Mjc5MQ== 6491058 2020-02-12T22:34:56Z 2023-08-02T19:50:42Z NONE

Actually it looks like this example is relevant: https://xarray.pydata.org/en/stable/examples/apply_ufunc_vectorize_1d.html

Hi dcherian,

I had a look at the apply_ufunc() example you linked and have re-implemented my code. The example helped me understand apply_ufunc() usage better but is very different from my use case and I still am unable to parallelize using dask.

The key difference is apply_ufunc() as described in the docs and the example, applys a function to a vector of data of a single type (in the example case it is air temperature across the 3 dimensions lat,long,time).

Where as I need to apply an operation using heterogeneous data (depth_bins, lower_limit, upper_limit) over a single dimension (time) to produce a new array of depths over time (which is why I tried groupby/map initially).

Anyhow, I have an implementation using apply_ufunc() that works using xarray and numpy arrays with apply_ufunc(), but when I try to parallelize it using dask my ufunc is called with empty arrays by xarray and it fails.

I.e. You can see when running the code below it logs the following when entering the ufunc: args: (array([], shape=(0, 0), dtype=int32), array([], dtype=int32), array([], dtype=int32), array([], dtype=int32)), kwargs: {}

I was expecting this to be called once for each chunk with 1000 items for each array.

Have I done something wrong in this work-around for the groupby/map code?

Thanks, Brendon

```python import sys import math import logging import dask import xarray import numpy

logger = logging.getLogger('main')

if name == 'main': logging.basicConfig( stream=sys.stdout, format='%(asctime)s %(levelname)-8s %(message)s', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')

logger.info('Starting dask client')
client = dask.distributed.Client()

SIZE = 3000
SONAR_BINS = 2000
upper_limit = numpy.random.randint(0, 10, (SIZE))
lower_limit = numpy.random.randint(20, 30, (SIZE))
sonar_data = numpy.random.randint(0, 255, (SIZE, SONAR_BINS))

time = range(0, SIZE)

channel = xarray.Dataset({
        'upper_limit': (['time'], upper_limit, {'units': 'depth meters'}),
        'lower_limit': (['time'],  lower_limit, {'units': 'depth meters'}),
        'data': (['time', 'depth_bin'], sonar_data, {'units': 'amplitude'}),
    },
    coords={
        'time': (['time'], time),
        'depth_bin': (['depth_bin'], range(0,SONAR_BINS)),
    })

logger.info('get overall min/max radar range we want to normalize to called the adjusted range')
adjusted_min, adjusted_max = channel.upper_limit.min().values.item(), channel.lower_limit.max().values.item()
adjusted_min = math.floor(adjusted_min)
adjusted_max = math.ceil(adjusted_max)
logger.info('adjusted_min: %s, adjusted_max: %s', adjusted_min, adjusted_max)

bin_count = len(channel.depth_bin)
logger.info('bin_count: %s', bin_count)

adjusted_depth_per_bin = (adjusted_max - adjusted_min) / bin_count
logger.info('adjusted_depth_per_bin: %s', adjusted_depth_per_bin)

adjusted_bin_depths = [adjusted_min + (j * adjusted_depth_per_bin) for j in range(0, bin_count)]
logger.info('adjusted_bin_depths[0]: %s ... [-1]: %s', adjusted_bin_depths[0], adjusted_bin_depths[-1])

def InterpSingle(unadjusted_depth_amplitudes, unadjusted_min, unadjusted_max, time):

    if (time % 1000) == 0:
        total = len(channel.time)
        perc = 100.0 * time / total
        logger.info('%s : %s of %s', perc, time, total)

    unadjusted_depth_per_bin = (unadjusted_max - unadjusted_min) / bin_count

    min_index = (adjusted_min - unadjusted_min) / unadjusted_depth_per_bin
    max_index = ((adjusted_min + ((bin_count - 1) * adjusted_depth_per_bin)) - unadjusted_min) / unadjusted_depth_per_bin
    index_mapping = numpy.linspace(min_index, max_index, bin_count)
    adjusted_depth_amplitudes = numpy.interp(index_mapping, range(0, len(unadjusted_depth_amplitudes)), unadjusted_depth_amplitudes, left=0, right=0)
    return adjusted_depth_amplitudes

def Interp(*args, **kwargs):
    logger.info('args: %s, kwargs: %s', args, kwargs)

    data = args[0]
    upper_limit = args[1]
    lower_limit = args[2]
    time = args[3]
    #logger.info('data: %s len(data[0]): %s data.shape: %s', data, len(data[0]), data.shape)

    adjusted = []
    for i in range(0, len(upper_limit)):
        d = data[i]
        u = upper_limit[i]
        l = lower_limit[i]
        t = time[i]
        result = InterpSingle(d, u, l, t)
        adjusted.append(result)

    #logger.info('adjusted: %s', adjusted)
    return adjusted



channel = channel.chunk({'time':1000}) # Comment this line out to disable dask
#logger.info('Channel: %s', channel)
#logger.info('shape of data: %s len(data[0]): %s', channel.data.shape, len(channel.data[0]))
m2_lazy = xarray.apply_ufunc(
    Interp, 
    channel.data, 
    channel.upper_limit, 
    channel.lower_limit, 
    channel.time, 
    input_core_dims=[['depth_bin'], [], [], []], 
    output_core_dims=[['depth']],
    dask='parallelized',   # Comment this line out to disable dask
    output_dtypes=[numpy.dtype(numpy.int32)],   # Comment this line out to disable dask
    output_sizes={'depth':len(adjusted_bin_depths)},   # Comment this line out to disable dask
    )
m2 = m2_lazy.compute() # Comment this line out to disable dask
m2 = m2.assign_coords({'depth':adjusted_bin_depths})

logger.info('Closing dask client')
client.close()

logger.info('Exit process')

```

{
    "total_count": 0,
    "+1": 0,
    "-1": 0,
    "laugh": 0,
    "hooray": 0,
    "confused": 0,
    "heart": 0,
    "rocket": 0,
    "eyes": 0
}
  561921094
Powered by Datasette · Queries took 0.944ms · About: xarray-datasette