我可以使用xarray.apply_ufunc并行化numpy.bincount吗? [英] Can I parallelize `numpy.bincount` using `xarray.apply_ufunc`?
问题描述
我想使用xarray
的apply_ufunc
API并行化numpy.bincount
函数,下面的代码是我尝试过的:
I want to parallelize the numpy.bincount
function using the apply_ufunc
API of xarray
and the following code is what I've tried:
import numpy as np
import xarray as xr
da = xr.DataArray(np.random.rand(2,16,32),
dims=['time', 'y', 'x'],
coords={'time': np.array(['2019-04-18', '2019-04-19'],
dtype='datetime64'),
'y': np.arange(16), 'x': np.arange(32)})
f = xr.DataArray(da.data.reshape((2,512)),dims=['time','idx'])
x = da.x.values
y = da.y.values
r = np.sqrt(x[np.newaxis,:]**2 + y[:,np.newaxis]**2)
nbins = 4
if x.max() > y.max():
ri = np.linspace(0., y.max(), nbins)
else:
ri = np.linspace(0., x.max(), nbins)
ridx = np.digitize(np.ravel(r), ri)
func = lambda a, b: np.bincount(a, weights=b)
xr.apply_ufunc(func, xr.DataArray(ridx,dims=['idx']), f)
但出现以下错误:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-203-974a8f0a89e8> in <module>()
12
13 func = lambda a, b: np.bincount(a, weights=b)
---> 14 xr.apply_ufunc(func, xr.DataArray(ridx,dims=['idx']), f)
~/anaconda/envs/uptodate/lib/python3.6/site-packages/xarray/core/computation.py in apply_ufunc(func, *args, **kwargs)
979 signature=signature,
980 join=join,
--> 981 exclude_dims=exclude_dims)
982 elif any(isinstance(a, Variable) for a in args):
983 return variables_ufunc(*args)
~/anaconda/envs/uptodate/lib/python3.6/site-packages/xarray/core/computation.py in apply_dataarray_ufunc(func, *args, **kwargs)
208
209 data_vars = [getattr(a, 'variable', a) for a in args]
--> 210 result_var = func(*data_vars)
211
212 if signature.num_outputs > 1:
~/anaconda/envs/uptodate/lib/python3.6/site-packages/xarray/core/computation.py in apply_variable_ufunc(func, *args, **kwargs)
558 raise ValueError('unknown setting for dask array handling in '
559 'apply_ufunc: {}'.format(dask))
--> 560 result_data = func(*input_data)
561
562 if signature.num_outputs == 1:
<ipython-input-203-974a8f0a89e8> in <lambda>(a, b)
11 ridx = np.digitize(np.ravel(r), ri)
12
---> 13 func = lambda a, b: np.bincount(a, weights=b)
14 xr.apply_ufunc(func, xr.DataArray(ridx,dims=['idx']), f)
ValueError: object too deep for desired array
我很迷失错误的根源,我们将不胜感激...
I am kind of lost where the error is stemming from and help would be greatly appreciated...
推荐答案
问题在于,apply_along_axis
将第一个参数的1D切片迭代到所应用的函数上,而不是其他任何一个.如果我正确理解了您的用例,那么您实际上想遍历一维权重的切片(np.bincount
签名中的weights
),不是整数数组(np.bincount
签名中的x
).
The issue is that apply_along_axis
iterates over 1D slices of the first argument to the applied function and not any of the others. If I understand your use-case correctly, you actually want to iterate over 1D slices of the weights (weights
in the np.bincount
signature), not the integer array (x
in the np.bincount
signature).
解决此问题的一种方法是在np.bincount
周围编写一个瘦包装器函数,该函数可以简单地切换参数的顺序:
One way to work around this is to write a thin wrapper function around np.bincount
that simply switches the order of the arguments:
def wrapped_bincount(weights, x):
return np.bincount(x, weights=weights)
然后我们可以在您的用例中将此功能与np.apply_along_axis
一起使用:
We can then use np.apply_along_axis
with this function for your use-case:
def apply_bincount_along_axis(x, weights, axis=-1):
return np.apply_along_axis(wrapped_bincount, axis, weights, x)
最后,我们可以使用 apply_ufunc
,请注意它可以通过dask自动并行化(还请注意,我们不需要提供axis
参数,因为xarray会自动将输入核心尺寸dim
移动到最后一个位置在weights
数组中,然后再应用该函数):
Finally, we can wrap this new function for use with xarray using apply_ufunc
, noting that it can be automatically parallelized with dask (also note that that we do not need to provide an axis
argument, because xarray will automatically move the input core dimension dim
to the last position in the weights
array before applying the function):
def xbincount(x, weights):
if len(x.dims) != 1:
raise ValueError('x must be one-dimensional')
dim, = x.dims
nbins = x.max() + 1
return xr.apply_ufunc(apply_bincount_along_axis, x, weights,
input_core_dims=[[dim], [dim]],
output_core_dims=[['bin']], dask='parallelized',
output_dtypes=[np.float], output_sizes={'bin': nbins})
将此功能应用于您的示例,如下所示:
Applying this function to your example then looks like:
xbincount(ridx, f)
<xarray.DataArray (time: 2, bin: 5)>
array([[ 0. , 7.934821, 34.066872, 51.118065, 152.769169],
[ 0. , 11.692989, 33.262936, 44.993856, 157.642972]])
Dimensions without coordinates: time, bin
根据需要,它也可以用于dask数组:
As desired it also works with dask arrays:
xbincount(ridx, f.chunk({'time': 1}))
<xarray.DataArray (time: 2, bin: 5)>
dask.array<shape=(2, 5), dtype=float64, chunksize=(1, 5)>
Dimensions without coordinates: time, bin
这篇关于我可以使用xarray.apply_ufunc并行化numpy.bincount吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!