python dask DataFrame,是否支持(普通并行化)行? [英] python dask DataFrame, support for (trivially parallelizable) row apply?

查看:197
本文介绍了python dask DataFrame,是否支持(普通并行化)行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我最近发现了 dask 模块,该模块旨在成为易于使用的python并行处理模块.对我来说,最大的卖点是它可以与熊猫一起使用.

I recently found dask module that aims to be an easy-to-use python parallel processing module. Big selling point for me is that it works with pandas.

在阅读了手册页的内容后,我无法找到一种方法来完成可并行化的琐碎任务:

After reading a bit on its manual page, I can't find a way to do this trivially parallelizable task:

ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply

目前,要通过AFAIK快速实现这一目标,

At the moment, to achieve this in dask, AFAIK,

ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame

语法丑陋,实际上比完全慢

which is ugly syntax and is actually slower than outright

df.apply(func, axis = 1) # for pandas DF row apply

有什么建议吗?

感谢@MRocklin提供的地图功能.它似乎比普通大熊猫要慢.这是否与熊猫GIL发布问题有关?还是我做错了?

Thanks @MRocklin for the map function. It seems to be slower than plain pandas apply. Is this related to pandas GIL releasing issue or am I doing it wrong?

import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)

def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s

s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec

推荐答案

map_partitions

您可以使用map_partitions函数将函数应用于数据框的所有分区.

map_partitions

You can apply your function to all of the partitions of your dataframe with the map_partitions function.

df.map_partitions(func, columns=...)

请注意,一次只会向func提供一部分数据集,而不是像pandas apply那样提供整个数据集(如果想要进行并行操作,您可能不希望这样做).

Note that func will be given only part of the dataset at a time, not the entire dataset like with pandas apply (which presumably you wouldn't want if you want to do parallelism.)

您可以使用map

df.mycolumn.map(func)

您可以使用apply

df.apply(func, axis=1)

线程与进程

从0.6.0版本开始,dask.dataframes与线程并行化.自定义Python函数不会从基于线程的并行性中获得太多好处.您可以尝试使用流程

Threads vs Processes

As of version 0.6.0 dask.dataframes parallelizes with threads. Custom Python functions will not receive much benefit from thread-based parallelism. You could try processes instead

df = dd.read_csv(...)

df.map_partitions(func, columns=...).compute(scheduler='processes')

但是要避免apply

但是,在熊猫和Dask中,您应该避免使用自定义Python函数的apply.这通常是性能不佳的根源.可能是,如果您找到一种以矢量化方式进行操作的方法,则可能是您的Pandas代码快了100倍,并且根本不需要dask.dataframe.

But avoid apply

However, you should really avoid apply with custom Python functions, both in Pandas and in Dask. This is often a source of poor performance. It could be that if you find a way to do your operation in a vectorized manner then it could be that your Pandas code will be 100x faster and you won't need dask.dataframe at all.

对于您的特定问题,您可以考虑 numba .这样可以大大提高您的性能.

For your particular problem you might consider numba. This significantly improves your performance.

In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)

In [4]: %paste
def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s
## -- End pasted text --

In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms

In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)

In [8]: %time _ = s.apply(fast_func)  # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms

In [9]: %time _ = s.apply(fast_func)  # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms

免责声明,我为同时生产numbadask并雇用许多pandas开发人员的公司工作.

Disclaimer, I work for the company that makes both numba and dask and employs many of the pandas developers.

这篇关于python dask DataFrame,是否支持(普通并行化)行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆