pandas df.iterrows()并行化 [英] Pandas df.iterrows() parallelization

查看:280
本文介绍了 pandas df.iterrows()并行化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想并行化以下代码:

for row in df.iterrows():
    idx = row[0]
    k = row[1]['Chromosome']
    start,end = row[1]['Bin'].split('-')

    sequence = sequence_from_coordinates(k,1,start,end) #slow download form http

    df.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
    df.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
    df.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))

我尝试使用multiprocessing.Pool(),因为每一行都可以独立处理,但是我不知道如何共享DataFrame.我也不确定这是否是与熊猫并行化的最佳方法.有帮助吗?

I have tried to use multiprocessing.Pool() since each row can be processed independently, but I can't figure out how to share the DataFrame. I am also not sure that this is the best approach to do parallelization with pandas. Any help?

推荐答案

正如@Khris在他的评论中所说,您应该将数据帧分成几个大块,并在每个块上并行进行迭代.您可以将数据帧任意分成随机大小的块,但是根据您计划使用的进程数将数据帧分成相等大小的块更有意义.幸运的是,其他人已经已经弄清楚了该怎么做为我们:

As @Khris said in his comment, you should split up your dataframe into a few large chunks and iterate over each chunk in parallel. You could arbitrarily split the dataframe into randomly sized chunks, but it makes more sense to divide the dataframe into equally sized chunks based on the number of processes you plan on using. Luckily someone else has already figured out how to do that part for us:

# don't forget to import
import pandas as pd
import multiprocessing

# create as many processes as there are CPUs on your machine
num_processes = multiprocessing.cpu_count()

# calculate the chunk size as an integer
chunk_size = int(df.shape[0]/num_processes)

# this solution was reworked from the above link.
# will work even if the length of the dataframe is not evenly divisible by num_processes
chunks = [df.ix[df.index[i:i + chunk_size]] for i in range(0, df.shape[0], chunk_size)]

这将创建一个列表,其中包含我们的数据框(大块).现在,我们需要将其与将操纵数据的函数一起传递到池中.

This creates a list that contains our dataframe in chunks. Now we need to pass it into our pool along with a function that will manipulate the data.

def func(d):
   # let's create a function that squares every value in the dataframe
   return d * d

# create our pool with `num_processes` processes
pool = multiprocessing.Pool(processes=num_processes)

# apply our function to each chunk in the list
result = pool.map(func, chunks)

此时,result将是一个列表,其中包含每个已被操作的块.在这种情况下,所有值均已平方.现在的问题是原始数据框尚未修改,因此我们必须用池中的结果替换其所有现有值.

At this point, result will be a list holding each chunk after it has been manipulated. In this case, all values have been squared. The issue now is that the original dataframe has not been modified, so we have to replace all of its existing values with the results from our pool.

for i in range(len(result)):
   # since result[i] is just a dataframe
   # we can reassign the original dataframe based on the index of each chunk
   df.ix[result[i].index] = result[i]

现在,我操作数据框的功能已向量化,如果我将其简单地应用于整个数据框而不是拆分成块,则可能会更快.但是,在您的情况下,您的函数将遍历每个块的每一行,然后返回该块.这使您可以一次处理num_process行.

Now, my function to manipulate my dataframe is vectorized and would likely have been faster if I had simply applied it to the entirety of my dataframe instead of splitting into chunks. However, in your case, your function would iterate over each row of the each chunk and then return the chunk. This allows you to process num_process rows at a time.

def func(d):
   for row in d.iterrow():
      idx = row[0]
      k = row[1]['Chromosome']
      start,end = row[1]['Bin'].split('-')

      sequence = sequence_from_coordinates(k,1,start,end) #slow download form http
      d.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
      d.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
      d.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))
   # return the chunk!
   return d

然后,您在原始数据帧中重新分配了值,并且您已成功并行化了此过程.

Then you reassign the values in the original dataframe, and you have successfully parallelized this process.

您的最佳性能将取决于此问题的答案.而所有过程!!!!"是一个答案,更好的答案更细微.在某一点之后,在一个问题上投入更多的过程实际上会产生超出其价值的开销.这就是阿姆达尔定律.再次,我们很幸运,其他人已经为我们解决了这个问题:

Your optimal performance is going to depend on the answer to this question. While "ALL OF THE PROCESSES!!!!" is one answer, a better answer is much more nuanced. After a certain point, throwing more processes at a problem actually creates more overhead than it's worth. This is known as Amdahl's Law. Again, we are fortunate that others have already tackled this question for us:

  1. Python多重处理的Pool进程限制
  2. 我应该并行运行多少个进程?
  1. Python multiprocessing's Pool process limit
  2. How many processes should I run in parallel?

一个好的默认设置是使用multiprocessing.cpu_count(),这是multiprocessing.Pool的默认行为. 根据文档如果进程为None,则使用了cpu_count()返回的数字."这就是为什么我在开始时将num_processes设置为multiprocessing.cpu_count()的原因.这样,如果您使用功能更强大的计算机,则无需直接更改num_processes变量即可从中受益.

A good default is to use multiprocessing.cpu_count(), which is the default behavior of multiprocessing.Pool. According to the documentation "If processes is None then the number returned by cpu_count() is used." That's why I set num_processes at the beginning to multiprocessing.cpu_count(). This way, if you move to a beefier machine, you get the benefits from it without having to change the num_processes variable directly.

这篇关于 pandas df.iterrows()并行化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆