为什么在 pandas 中,多处理比简单计算要慢? [英] why is multiprocessing slower than a simple computation in Pandas?

查看:103
本文介绍了为什么在 pandas 中,多处理比简单计算要慢?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这与如何并行化有关在熊猫中使用Apply进行许多(模糊)字符串比较?

再次考虑这个简单(但有趣)的例子:

import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
from fuzzywuzzy import fuzz
import pandas as pd

master= pd.DataFrame({'original':['this is a nice sentence',
'this is another one',
'stackoverflow is nice']})

slave= pd.DataFrame({'name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'stackoverflow is nice'],'my_value': [1,2,3,4,5]})

def fuzzy_score(str1, str2):
    return fuzz.token_set_ratio(str1, str2)

def helper(orig_string, slave_df):
    slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
    #return my_value corresponding to the highest score
    return slave_df.loc[slave_df.score.idxmax(),'my_value']

master
Out[39]: 
                  original
0  this is a nice sentence
1      this is another one
2    stackoverflow is nice

slave
Out[40]: 
   my_value                      name
0         1               hello world
1         2           congratulations
2         3  this is a nice sentence 
3         4       this is another one
4         5     stackoverflow is nice

我需要做的很简单:

  • 对于master中的每一行,我都使用fuzzywuzzy计算出的字符串相似性得分来查找数据框slave中的最佳匹配项.

现在让我们将这些数据框做大一些:

master = pd.concat([master] * 100,  ignore_index  = True)
slave = pd.concat([slave] * 10,  ignore_index  = True)

首先,我尝试使用dask

#prepare the computation
dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))

现在是计时:

#multithreaded
%timeit dmaster.compute(get=dask.threaded.get) 
1 loop, best of 3: 346 ms per loop

#multiprocess
%timeit dmaster.compute(get=dask.multiprocessing.get) 
1 loop, best of 3: 1.93 s per loop

#good 'ol pandas
%timeit master['my_value'] = master.original.apply(lambda x: helper(x,slave))
100 loops, best of 3: 2.18 ms per loop

第二,我尝试使用旧的multiprocessing软件包

from multiprocessing import Pool, cpu_count

def myfunc(df):
    return df.original.apply(lambda x: helper(x, slave))

from datetime import datetime

if __name__ == '__main__':
     startTime = datetime.now()
     p = Pool(cpu_count() - 1)
     ret_list = p.map(myfunc, [master.iloc[1:100,], master.iloc[100:200 ,],
                               master.iloc[200:300 ,]])
     results = pd.concat(ret_list)
     print datetime.now() - startTime

这将提供相同的时间

runfile('C:/Users/john/untitled6.py', wdir='C:/Users/john')
0:00:01.927000

问题:与这里的熊猫相比,为什么同时使用Daskmultiprocessing的多处理程序这么慢?假设我的真实数据远不止于此.我能得到更好的结果吗?

毕竟,我在这里考虑的问题是embarassingly parallel(每一行都是一个独立的问题),因此这些程序包应该确实很出色.

我在这里想念东西吗?

谢谢!

解决方案

让我将我的评论总结成一个答案.我希望这些信息对您有帮助,因为这里有很多问题.

首先,我想向您指出:Distributed.readthedocs.io/en/latest/efficiency.html,其中讨论了许多令人迷惑的性能主题.请注意,这全都是关于分布式调度程序的,但是由于可以在进程中,使用线程或进程或它们的组合来启动,因此它确实可以取代以前的快速调度程序,并且通常建议在所有情况下使用.

1)创建过程需要时间.这始终是正确的,尤其是在Windows上.如果您对真实的性能感兴趣,您将只希望创建一次具有固定开销的流程,并运行许多任务.您可以通过多种方式来构建集群,甚至在本地也可以. >

2)任务(或其他任何调度程序)处理的每个任务都会产生一些开销.在分布式调度程序的情况下,这是<1ms,但是在任务本身的运行时间非常短的情况下,这可能是重要的.

3)在dask中将整个数据集加载到客户端并将其传递给工作人员是一种反模式.相反,您希望使用像dask.dataframe.read_csv这样的函数,在这些函数中,数据是由工作程序加载的,从而避免了昂贵的序列化和进程间通信. Dask确实擅长将计算移至数据所在的位置,从而最大程度地减少了通信.

4)当进程之间进行通信时,序列化的方法很重要,这就是为什么非快速多处理对您如此缓慢的原因.

5)最后,并非所有工作都能在快速的工作中发现绩效的提高.这取决于许多因素,但通常最主要的是:数据是否可以舒适地容纳在内存中?如果是,那么很难匹配numpy和pandas中经过优化的方法.与往常一样,您应该始终对代码进行概要分析...

This is related to how to parallelize many (fuzzy) string comparisons using apply in Pandas?

Consider this simple (but funny) example again:

import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
from fuzzywuzzy import fuzz
import pandas as pd

master= pd.DataFrame({'original':['this is a nice sentence',
'this is another one',
'stackoverflow is nice']})

slave= pd.DataFrame({'name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'stackoverflow is nice'],'my_value': [1,2,3,4,5]})

def fuzzy_score(str1, str2):
    return fuzz.token_set_ratio(str1, str2)

def helper(orig_string, slave_df):
    slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
    #return my_value corresponding to the highest score
    return slave_df.loc[slave_df.score.idxmax(),'my_value']

master
Out[39]: 
                  original
0  this is a nice sentence
1      this is another one
2    stackoverflow is nice

slave
Out[40]: 
   my_value                      name
0         1               hello world
1         2           congratulations
2         3  this is a nice sentence 
3         4       this is another one
4         5     stackoverflow is nice

What I need to do is simple:

  • For every row in master, I lookup into the Dataframe slave for the best match using the string similarity score computed by fuzzywuzzy.

Now let's make these dataframes a bit bigger:

master = pd.concat([master] * 100,  ignore_index  = True)
slave = pd.concat([slave] * 10,  ignore_index  = True)

First, I have tried with dask

#prepare the computation
dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))

and now here are the timings:

#multithreaded
%timeit dmaster.compute(get=dask.threaded.get) 
1 loop, best of 3: 346 ms per loop

#multiprocess
%timeit dmaster.compute(get=dask.multiprocessing.get) 
1 loop, best of 3: 1.93 s per loop

#good 'ol pandas
%timeit master['my_value'] = master.original.apply(lambda x: helper(x,slave))
100 loops, best of 3: 2.18 ms per loop

Second, I have tried with the good old multiprocessing package

from multiprocessing import Pool, cpu_count

def myfunc(df):
    return df.original.apply(lambda x: helper(x, slave))

from datetime import datetime

if __name__ == '__main__':
     startTime = datetime.now()
     p = Pool(cpu_count() - 1)
     ret_list = p.map(myfunc, [master.iloc[1:100,], master.iloc[100:200 ,],
                               master.iloc[200:300 ,]])
     results = pd.concat(ret_list)
     print datetime.now() - startTime

which gives about the same time

runfile('C:/Users/john/untitled6.py', wdir='C:/Users/john')
0:00:01.927000

Question: why is multiprocessing with both Dask and multiprocessing so slow compared to Pandas here? Assume my real data is much bigger than that. Could I get a better outcome?

After all, the problem I consider here is embarassingly parallel (every row is an independent problem), so these packages should really shine.

Am I missing something here?

Thanks!

解决方案

Let me summarize the comments I made into something like an answer. I hope this information proves useful, as there are a number of issues rolled into one here.

First, I would like to point you to distributed.readthedocs.io/en/latest/efficiency.html , where a number of dask performance topics are discussed. Note that this is all in terms of the distributed scheduler, but since that can be started in-process, with threads or processes, or a combination of these, it really does supercede the previous dask schedulers, and is generally recommended in all cases.

1) It takes time to create processes. This is always true, and particularly true on windows. You will want to create the processes only once, with its fixed overhead, and run many tasks, if you are interested in real-life performance. In dask there are many ways of making your cluster, even locally.

2) Every task that dask (or any other dispatcher) handles incurs some overhead. In the case of the distributed scheduler, this is <1ms, but in the case where the runtime of the task itself is very short, this can be significant.

3) It is an anti-pattern in dask to load the whole dataset in the client and pass it to the worker(s). You want, instead, to use functions like dask.dataframe.read_csv, where the data is loaded by the workers, avoiding expensive serialization and inter-process communication. Dask is really good at moving the computation to where the data is, minimizing communication.

4) When communication between processes, the method of serialization matters, which is my guess at why non-dask multiprocessing is so slow for you.

5) Finally, not all jobs will find gains in performance under dask. This depends on a number of things, but often the main one is: does the data comfortably fit in memory? If yes, it may be hard to match the well-optimized methods in numpy and pandas. As always, you should always profile your code...

这篇关于为什么在 pandas 中,多处理比简单计算要慢?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆