我可以使用xarray.apply_ufunc并行化numpy.bincount吗? [英] Can I parallelize `numpy.bincount` using `xarray.apply_ufunc`?

查看:330
本文介绍了我可以使用xarray.apply_ufunc并行化numpy.bincount吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用xarrayapply_ufunc API并行化numpy.bincount函数,下面的代码是我尝试过的:

I want to parallelize the numpy.bincount function using the apply_ufunc API of xarray and the following code is what I've tried:

import numpy as np
import xarray as xr
da = xr.DataArray(np.random.rand(2,16,32),
                  dims=['time', 'y', 'x'],
                  coords={'time': np.array(['2019-04-18', '2019-04-19'],
                                          dtype='datetime64'), 
                         'y': np.arange(16), 'x': np.arange(32)})

f = xr.DataArray(da.data.reshape((2,512)),dims=['time','idx'])
x = da.x.values
y = da.y.values
r = np.sqrt(x[np.newaxis,:]**2 + y[:,np.newaxis]**2)
nbins = 4
if x.max() > y.max():
    ri = np.linspace(0., y.max(), nbins)
else:
    ri = np.linspace(0., x.max(), nbins)

ridx = np.digitize(np.ravel(r), ri)

func = lambda a, b: np.bincount(a, weights=b)
xr.apply_ufunc(func, xr.DataArray(ridx,dims=['idx']), f)

但出现以下错误:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-203-974a8f0a89e8> in <module>()
     12 
     13 func = lambda a, b: np.bincount(a, weights=b)
---> 14 xr.apply_ufunc(func, xr.DataArray(ridx,dims=['idx']), f)

~/anaconda/envs/uptodate/lib/python3.6/site-packages/xarray/core/computation.py in apply_ufunc(func, *args, **kwargs)
    979                                      signature=signature,
    980                                      join=join,
--> 981                                      exclude_dims=exclude_dims)
    982     elif any(isinstance(a, Variable) for a in args):
    983         return variables_ufunc(*args)

~/anaconda/envs/uptodate/lib/python3.6/site-packages/xarray/core/computation.py in apply_dataarray_ufunc(func, *args, **kwargs)
    208 
    209     data_vars = [getattr(a, 'variable', a) for a in args]
--> 210     result_var = func(*data_vars)
    211 
    212     if signature.num_outputs > 1:

~/anaconda/envs/uptodate/lib/python3.6/site-packages/xarray/core/computation.py in apply_variable_ufunc(func, *args, **kwargs)
    558             raise ValueError('unknown setting for dask array handling in '
    559                              'apply_ufunc: {}'.format(dask))
--> 560     result_data = func(*input_data)
    561 
    562     if signature.num_outputs == 1:

<ipython-input-203-974a8f0a89e8> in <lambda>(a, b)
     11 ridx = np.digitize(np.ravel(r), ri)
     12 
---> 13 func = lambda a, b: np.bincount(a, weights=b)
     14 xr.apply_ufunc(func, xr.DataArray(ridx,dims=['idx']), f)

ValueError: object too deep for desired array

我很迷失错误的根源,我们将不胜感激...

I am kind of lost where the error is stemming from and help would be greatly appreciated...

推荐答案

问题在于,apply_along_axis将第一个参数的1D切片迭代到所应用的函数上,而不是其他任何一个.如果我正确理解了您的用例,那么您实际上想遍历一维权重的切片(np.bincount签名中的weights)不是整数数组(np.bincount签名中的x).

The issue is that apply_along_axis iterates over 1D slices of the first argument to the applied function and not any of the others. If I understand your use-case correctly, you actually want to iterate over 1D slices of the weights (weights in the np.bincount signature), not the integer array (x in the np.bincount signature).

解决此问题的一种方法是在np.bincount周围编写一个瘦包装器函数,该函数可以简单地切换参数的顺序:

One way to work around this is to write a thin wrapper function around np.bincount that simply switches the order of the arguments:

def wrapped_bincount(weights, x):
    return np.bincount(x, weights=weights)

然后我们可以在您的用例中将此功能与np.apply_along_axis一起使用:

We can then use np.apply_along_axis with this function for your use-case:

def apply_bincount_along_axis(x, weights, axis=-1):
    return np.apply_along_axis(wrapped_bincount, axis, weights, x)

最后,我们可以使用 apply_ufunc ,请注意它可以通过dask自动并行化(还请注意,我们不需要提供axis参数,因为xarray会自动将输入核心尺寸dim移动到最后一个位置在weights数组中,然后再应用该函数):

Finally, we can wrap this new function for use with xarray using apply_ufunc, noting that it can be automatically parallelized with dask (also note that that we do not need to provide an axis argument, because xarray will automatically move the input core dimension dim to the last position in the weights array before applying the function):

def xbincount(x, weights):
    if len(x.dims) != 1:
        raise ValueError('x must be one-dimensional')

    dim, = x.dims
    nbins = x.max() + 1

    return xr.apply_ufunc(apply_bincount_along_axis, x, weights, 
        input_core_dims=[[dim], [dim]],
        output_core_dims=[['bin']], dask='parallelized',
        output_dtypes=[np.float], output_sizes={'bin': nbins})

将此功能应用于您的示例,如下所示:

Applying this function to your example then looks like:

xbincount(ridx, f)

<xarray.DataArray (time: 2, bin: 5)>
array([[  0.      ,   7.934821,  34.066872,  51.118065, 152.769169],
       [  0.      ,  11.692989,  33.262936,  44.993856, 157.642972]])
Dimensions without coordinates: time, bin

根据需要,它也可以用于dask数组:

As desired it also works with dask arrays:

xbincount(ridx, f.chunk({'time': 1}))

<xarray.DataArray (time: 2, bin: 5)>
dask.array<shape=(2, 5), dtype=float64, chunksize=(1, 5)>
Dimensions without coordinates: time, bin

这篇关于我可以使用xarray.apply_ufunc并行化numpy.bincount吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆