如何在Dask中为每个分区返回一个NumPy数组? [英] How to return one NumPy array per partition in Dask?
问题描述
我需要计算许多NumPy数组(最多4维),为Dask数据帧的每个分区分配一个,然后将它们添加为数组.但是,我正在努力使 map_partitions
为每个分区返回一个数组,而不是为所有分区返回单个数组.
I need to compute many NumPy arrays (that can be up to 4-dimensional), one for each partition of a Dask dataframe, and then add them as arrays. However, I'm struggling to make map_partitions
return an array for each partition instead of a single array for all of them.
import dask.dataframe as dd
import numpy as np, pandas as pd
df = pd.DataFrame(range(15), columns=['x'])
ddf = dd.from_pandas(df, npartitions=3)
def func(partition):
# Here I also tried returning the array in a list and in a tuple
return np.array([[1, 2], [3, 4]])
# Here I tried all the options available for 'meta'
results = ddf.map_partitions(func).compute()
然后结果
是:
array([[1, 2],
[3, 4],
[1, 2],
[3, 4],
[1, 2],
[3, 4]])
如果我执行 results.sum().compute()
,则得到 30
.
我想要得到的是:
[np.array([[1, 2],[3, 4]]), np.array([[1, 2],[3, 4]]), np.array([[1, 2],[3, 4]])]
因此,如果我计算总和,我将得到:
So that if I compute the sum, I get:
array([[ 3, 6],
[ 9, 12]])
如何用Dask达到这个结果?
How can you achieve this result with Dask?
推荐答案
是的,dask数组通常被视为单个逻辑数组,它恰好是由多个部分组成的.单是您不使用逻辑层,就可以单独使用 delayed
完成您的工作.另一方面,似乎您想要的最终结果实际上是对所有数据的总和,因此,也许更简单的方法就是适当的 reshape
和 sum(axis =)
?
You are right, a dask-array is usually to be viewed as a single logical array, which just happens to be made of pieces. Single you are not using the logical layer, you could have done your work with delayed
alone. On the other hand, it seems like the end result you want really is a sum over all the data, so maybe even simpler would be an appropriate reshape
and sum(axis=)
?
ddf.map_partitions(func).compute_chunk_sizes().reshape(
-1, 2, 2).sum(axis=0).compute()
(需要 compute_chunk_sizes
,因为尽管原始的熊猫数据框具有已知的大小,但Dask尚未评估您的函数仍不知道返回的大小)
(compute_chunk_sizes
is needed because although your original pandas dataframe had a known size, Dask did not evaluate your function yet to know what sizes it gave back)
但是,根据您的设置,以下操作将起作用,并且与您的原始尝试更加相似,请参见
However, given your setup, the following would work and be more similar to your original attempt, see .to_delayed()
list_of_delayed = ddf.map_partitions(func).to_delayed().tolist()
tuple_of_np_lists = dask.compute(*list_of_delayed)
( tolist
强制评估包含的延迟对象)
(tolist
forces evaluating the contained delayed objects)
这篇关于如何在Dask中为每个分区返回一个NumPy数组?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!