如何在Dask中为每个分区返回一个NumPy数组? [英] How to return one NumPy array per partition in Dask?

查看:128
本文介绍了如何在Dask中为每个分区返回一个NumPy数组?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要计算许多NumPy数组(最多4维),为Dask数据帧的每个分区分配一个,然后将它们添加为数组.但是,我正在努力使 map_partitions 为每个分区返回一个数组,而不是为所有分区返回单个数组.

I need to compute many NumPy arrays (that can be up to 4-dimensional), one for each partition of a Dask dataframe, and then add them as arrays. However, I'm struggling to make map_partitions return an array for each partition instead of a single array for all of them.

import dask.dataframe as dd
import numpy as np, pandas as pd

df = pd.DataFrame(range(15), columns=['x'])
ddf = dd.from_pandas(df, npartitions=3)

def func(partition):
    # Here I also tried returning the array in a list and in a tuple
    return np.array([[1, 2], [3, 4]])

# Here I tried all the options available for 'meta'
results = ddf.map_partitions(func).compute()

然后结果是:

array([[1, 2],
       [3, 4],
       [1, 2],
       [3, 4],
       [1, 2],
       [3, 4]])

如果我执行 results.sum().compute(),则得到 30 .

我想要得到的是:

[np.array([[1, 2],[3, 4]]), np.array([[1, 2],[3, 4]]), np.array([[1, 2],[3, 4]])]

因此,如果我计算总和,我将得到:

So that if I compute the sum, I get:

array([[ 3,  6],
       [ 9, 12]])

如何用Dask达到这个结果?

How can you achieve this result with Dask?

推荐答案

是的,dask数组通常被视为单个逻辑数组,它恰好是由多个部分组成的.单是您不使用逻辑层,就可以单独使用 delayed 完成您的工作.另一方面,似乎您想要的最终结果实际上是对所有数据的总和,因此,也许更简单的方法就是适当的 reshape sum(axis =)?

You are right, a dask-array is usually to be viewed as a single logical array, which just happens to be made of pieces. Single you are not using the logical layer, you could have done your work with delayed alone. On the other hand, it seems like the end result you want really is a sum over all the data, so maybe even simpler would be an appropriate reshape and sum(axis=)?

ddf.map_partitions(func).compute_chunk_sizes().reshape(
    -1, 2, 2).sum(axis=0).compute()

(需要 compute_chunk_sizes ,因为尽管原始的熊猫数据框具有已知的大小,但Dask尚未评估您的函数仍不知道返回的大小)

(compute_chunk_sizes is needed because although your original pandas dataframe had a known size, Dask did not evaluate your function yet to know what sizes it gave back)

但是,根据您的设置,以下操作将起作用,并且与您的原始尝试更加相似,请参见

However, given your setup, the following would work and be more similar to your original attempt, see .to_delayed()

list_of_delayed = ddf.map_partitions(func).to_delayed().tolist()
tuple_of_np_lists = dask.compute(*list_of_delayed)

( tolist 强制评估包含的延迟对象)

(tolist forces evaluating the contained delayed objects)

这篇关于如何在Dask中为每个分区返回一个NumPy数组?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆