结合 itertools 和多处理? [英] Combining itertools and multiprocessing?

查看:15
本文介绍了结合 itertools 和多处理?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 256x256x256 Numpy 数组,其中每个元素都是一个矩阵.我需要对这些矩阵中的每一个进行一些计算,并且我想使用 multiprocessing 模块来加快速度.

I have a 256x256x256 Numpy array, in which each element is a matrix. I need to do some calculations on each of these matrices, and I want to use the multiprocessing module to speed things up.

这些计算的结果必须像原来的一样存储在一个256x256x256数组中,这样矩阵在元素[i,j,k]处的结果原数组中的元素必须放在新数组的[i,j,k]元素中.

The results of these calculations must be stored in a 256x256x256 array like the original one, so that the result of the matrix at element [i,j,k] in the original array must be put in the [i,j,k] element of the new array.

为此,我想创建一个列表,可以用伪方式编写为 [array[i,j,k], (i, j, k)] 和将其传递给要多处理"的函数.假设 matrices 是从原始数组中提取的所有矩阵的列表,而 myfunc 是进行计算的函数,代码看起来有点像这样:

To do this, I want to make a list which could be written in a pseudo-ish way as [array[i,j,k], (i, j, k)] and pass it to a function to be "multiprocessed". Assuming that matrices is a list of all the matrices extracted from the original array and myfunc is the function doing the calculations, the code would look somewhat like this:

import multiprocessing
import numpy as np
from itertools import izip

def myfunc(finput):
    # Do some calculations...
    ...

    # ... and return the result and the index:
    return (result, finput[1])

# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)

# Make function input from the matrices and the indices:
finput = izip(matrices, inds)

pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))

然而,似乎 map_async 实际上首先创建了这个巨大的 finput-list:我的 CPU 没有做太多,但内存和交换完全被消耗几秒钟的事,这显然不是我想要的.

However, it seems like map_async is actually creating this huge finput-list first: My CPU's aren't doing much, but the memory and swap get completely consumed in a matter of seconds, which is obviously not what I want.

有没有办法将这个巨大的列表传递给一个多处理函数而无需先显式创建它?或者你知道解决这个问题的另一种方法吗?

Is there a way to pass this huge list to a multiprocessing function without the need to explicitly create it first? Or do you know another way of solving this problem?

非常感谢!:-)

推荐答案

所有 multiprocessing.Pool.map* 方法完全使用迭代器(demo code) 只要函数叫.要一次给迭代器的 map 函数块提供一个块,请使用 grouper_nofill:

All multiprocessing.Pool.map* methods consume iterators fully(demo code) as soon as the function is called. To feed the map function chunks of the iterator one chunk at a time, use grouper_nofill:

def grouper_nofill(n, iterable):
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
    '''
    it=iter(iterable)
    def take():
        while 1: yield list(itertools.islice(it,n))
    return iter(take().next,[])

chunksize=256
async_results=[]
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)):
    async_results.extend(pool.map_async(myfunc, finput).get())
async_results=np.array(async_results)

PS.pool.map_asyncchunksize 参数做了一些不同的事情:它将可迭代对象分成块,然后将每个块交给一个调用 map(func,chunk).如果 func(item) 完成得太快,这可以为工作进程提供更多数据来咀嚼,但它对您的情况没有帮助,因为迭代器仍然在 map_async<之后立即被完全消耗/code> 调用已发出.

PS. pool.map_async's chunksize parameter does something different: It breaks the iterable into chunks, then gives each chunk to a worker process which calls map(func,chunk). This can give the worker process more data to chew on if func(item) finishes too quickly, but it does not help in your situation since the iterator still gets consumed fully immediately after the map_async call is issued.

这篇关于结合 itertools 和多处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆