并行python迭代 [英] Parallel python iteration

查看:67
本文介绍了并行python迭代的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想基于pandas.DataFrame中的值创建一个类的许多实例.这个我下来了.

I want to create a number of instances of a class based on values in a pandas.DataFrame. This I've got down.

import itertools
import multiprocessing as mp
import pandas as pd

class Toy:
    id_iter = itertools.count(1)

    def __init__(self, row):
        self.id = self.id_iter.next()
        self.type = row['type']

if __name__ == "__main__":

    table = pd.DataFrame({
        'type': ['a', 'b', 'c'],
        'number': [5000, 4000, 30000]
        })

    for index, row in table.iterrows():
        [Toy(row) for _ in range(row['number'])]

多处理尝试

我已经能够通过添加以下内容来并行化此(某种):

Multiprocessing Attempts

I've been able to parallelize this (sort of) by adding the following:

pool = mp.Pool(processes=mp.cpu_count())
m = mp.Manager()
q = m.Queue()

for index, row in table.iterrows():
    pool.apply_async([Toy(row) for _ in range(row['number'])])

如果row['number']中的数字明显长于table的长​​度,这似乎会更快.但是在我的实际情况中,table长数千行,而每个row['number']都相对较小.

It seems that this would be faster if the numbers in row['number'] are substantially longer than the length of table. But in my actual case, table is thousands of lines long, and each row['number'] is relatively small.

尝试将table分解为cpu_count()块并在表中进行迭代似乎更明智.但是现在我们处于我的python技能的边缘.

It seems smarter to try and break up table into cpu_count() chunks and iterate within the table. But now we're at the edge of my python skills.

我尝试了python解释器对我尖叫的事情,例如:

I've tried things that the python interpreter screams at me for, like:

pool.apply_async(
        for index, row in table.iterrows(): 
        [Toy(row) for _ in range(row['number'])]
        )

还有不能腌制"的东西

Parallel(n_jobs=4)(
    delayed(Toy)([row for _ in range(row['number'])]) \
            for index, row in table.iterrows()
)

编辑

这可能使我更接近一点,但仍然没有.我在单独的函数

Edit

This may gotten me a little bit closer, but still not there. I create the class instances in a separate function,

def create_toys(row):
    [Toy(row) for _ in range(row['number'])]

....

Parallel(n_jobs=4, backend="threading")(
    (create_toys)(row) for i, row in table.iterrows()
)

但是我被告知'NoneType'对象是不可迭代的.

but I'm told 'NoneType' object is not iterable.

推荐答案

对于我来说,我不清楚您期望的输出是什么.您是否只需要表格的大清单

It's a little bit unclear to me what the output you are expecting is. Do you just want a big list of the form

[Toy(row_1) ... Toy(row_n)]

每个Toy(row_i)在哪里出现多重性row_i.number?

where each Toy(row_i) appears with multiplicity row_i.number?

根据@JD Long提到的答案,我认为您可以执行以下操作:

Based on the answer mentioned by @JD Long I think you could do something like this:

def process(df):
    L = []
    for index, row in table.iterrows():
        L += [Toy(row) for _ in range(row['number'])]
    return L

table = pd.DataFrame({
    'type': ['a', 'b', 'c']*10,
    'number': [5000, 4000, 30000]*10
    })

p = mp.Pool(processes=8)
split_dfs = np.array_split(table,8)    
pool_results = p.map(process, split_dfs)
p.close()
p.join()

# merging parts processed by different processes
result = [a for L in pool_results for a in L]

这篇关于并行python迭代的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆