按行大块处理dask数据帧 [英] Process dask dataframe by chunks of rows

查看:50
本文介绍了按行大块处理dask数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用某些 blocksize 的块创建的dask数据帧:

I have a dask dataframe created using chunks of a certain blocksize:

df = dd.read_csv(filepath, blocksize = blocksize * 1024 * 1024)

我可以像这样大块地处理它:

I can process it in chunks like this:

partial_results = []
for partition in df.partitions:
    partial = trivial_func(partition[var])
    partial_results.append(partial)
result = delayed(sum)(partial_results)

(这里我尝试使用 map_partitions ,但最终只使用了 for 循环).在这一部分之前,一切正常.

(Here I tried using map_partitions, but ended up just using a for loop instead). Until this part everything goes ok.

现在,我需要对相同的数据运行一个函数,但是该函数需要a来接收一定数量的数据行(例如 rows_per_chunk = 60 ),这可以实现吗?.对于大熊猫,我会这样做:

Now, I need to run a function on the same data, but this function needs a to receive a certain number of rows of the dataframe instead (e.g. rows_per_chunk=60), is this achievable?. With pandas, I would do:

partial_results = []
for i in range(int(len_df/rows_per_chunk)): # I think ceil would be better if decimal
    arg_data = df.iloc[i*rows_per_chunk:(i+1)*rows_per_chunk]
    partial = not_so_trivial_func(arg_data)
    partial_results.append(partial)
result = sum(partial_results)

是否有可能用dask做类似的事情?我知道,由于延迟评估,无法使用 iloc ,但是可以用其他方式对数据帧进行分区吗?如果没有,那么用dask做到这一点的最有效方法是什么?该数据框具有数百万行.

Is it possible to do something like this with dask? I know that because of lazy evaluation, it's not possible to use iloc, but is it possible to partition the dataframe in a different way? If not, what would be the most efficient way to achieve this with dask? The dataframe has millions of rows.

推荐答案

您可以沿着划分重新划分数据帧,该划分定义了如何在各个分区之间分配索引值(假设唯一索引).

You can repartition the dataframe along a division which defines how index values should be allocated across partitions (assuming unique index).

import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame(range(15), columns=['x'])
ddf = dd.from_pandas(df, npartitions=3)

# there will 5 rows per partition
print(ddf.map_partitions(len).compute())

# you can see that ddf is split along these index values
print(ddf.divisions)

# change the divisions to have the desired spacing
new_divisions = (0, 3, 6, 9, 12, 14)
new_ddf = ddf.repartition(divisions=new_divisions)

# now there will be 3 rows per partition
print(new_ddf.map_partitions(len).compute())

如果不知道索引,则可以创建一个新索引(假设行不需要排序)并沿着计算出的划分重新分区:

If index is not known, then it's possible to create a new index (assuming that rows do not require sorting) and repartition along the computed divisions:

import dask.dataframe as dd
import pandas as pd

# save some data into unindexed csv
num_rows = 15
df = pd.DataFrame(range(num_rows), columns=['x'])
df.to_csv('dask_test.csv', index=False)


# read from csv
ddf = dd.read_csv('dask_test.csv', blocksize=10)

# assume that rows are already ordered (so no sorting is needed)
# then can modify the index using the lengths of partitions
cumlens = ddf.map_partitions(len).compute().cumsum()

# since processing will be done on a partition-by-partition basis, save them
# individually
new_partitions = [ddf.partitions[0]]
for npart, partition in enumerate(ddf.partitions[1:].partitions):
    partition.index = partition.index + cumlens[npart]
    new_partitions.append(partition)

# this is our new ddf
ddf = dd.concat(new_partitions)

#  set divisions based on cumulative lengths
ddf.divisions = tuple([0] + cumlens.tolist())

# change the divisions to have the desired spacing
new_partition_size = 12
max_rows = cumlens.tolist()[-1]
new_divisions = list(range(0, max_rows, new_partition_size))
if new_divisions[-1]<max_rows:
    new_divisions.append(max_rows)
new_ddf = ddf.repartition(divisions=new_divisions)

# now there will be desired rows per partition
print(new_ddf.map_partitions(len).compute())

这篇关于按行大块处理dask数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆