结合itertools和多处理? [英] Combining itertools and multiprocessing?

查看:61
本文介绍了结合itertools和多处理?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个256x256x256 Numpy数组,其中每个元素都是一个矩阵.我需要对每个矩阵进行一些计算,并且我想使用multiprocessing模块来加快速度.

I have a 256x256x256 Numpy array, in which each element is a matrix. I need to do some calculations on each of these matrices, and I want to use the multiprocessing module to speed things up.

这些计算的结果必须像原始数组一样存储在256x256x256数组中,以便原始数组中元素[i,j,k]处的矩阵结果必须放在新数组的[i,j,k]元素中数组.

The results of these calculations must be stored in a 256x256x256 array like the original one, so that the result of the matrix at element [i,j,k] in the original array must be put in the [i,j,k] element of the new array.

为此,我想制作一个列表,该列表可以伪伪方式编写为[array[i,j,k], (i, j, k)]并将其传递给要进行多处理"的函数. 假定matrices是从原始数组中提取的所有矩阵的列表,而myfunc是进行计算的函数,则代码看起来像这样:

To do this, I want to make a list which could be written in a pseudo-ish way as [array[i,j,k], (i, j, k)] and pass it to a function to be "multiprocessed". Assuming that matrices is a list of all the matrices extracted from the original array and myfunc is the function doing the calculations, the code would look somewhat like this:

import multiprocessing
import numpy as np
from itertools import izip

def myfunc(finput):
    # Do some calculations...
    ...

    # ... and return the result and the index:
    return (result, finput[1])

# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)

# Make function input from the matrices and the indices:
finput = izip(matrices, inds)

pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))

但是,看来map_async实际上是首先创建了这个庞大的finput -list:我的CPU并没有做很多事情,但是内存和交换空间在几秒钟内就被完全消耗掉了,这显然不是什么我想要.

However, it seems like map_async is actually creating this huge finput-list first: My CPU's aren't doing much, but the memory and swap get completely consumed in a matter of seconds, which is obviously not what I want.

是否有一种方法可以将该庞大的列表传递给多处理函数,而无需先显式创建它? 还是您知道解决此问题的另一种方法?

Is there a way to pass this huge list to a multiprocessing function without the need to explicitly create it first? Or do you know another way of solving this problem?

谢谢一堆! :-)

推荐答案

所有multiprocessing.Pool.map*方法都完全消耗迭代器

All multiprocessing.Pool.map* methods consume iterators fully(demo code) as soon as the function is called. To feed the map function chunks of the iterator one chunk at a time, use grouper_nofill:

def grouper_nofill(n, iterable):
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
    '''
    it=iter(iterable)
    def take():
        while 1: yield list(itertools.islice(it,n))
    return iter(take().next,[])

chunksize=256
async_results=[]
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)):
    async_results.extend(pool.map_async(myfunc, finput).get())
async_results=np.array(async_results)

PS. pool.map_asyncchunksize参数做了一些不同的工作:将可迭代项分成多个块,然后将每个块分配给一个调用map(func,chunk)的工作进程.如果func(item)完成得太快,这可以使工作进程处理更多数据,但是这对您的情况没有帮助,因为在发出map_async调用后,迭代器仍然会被完全消耗掉.

PS. pool.map_async's chunksize parameter does something different: It breaks the iterable into chunks, then gives each chunk to a worker process which calls map(func,chunk). This can give the worker process more data to chew on if func(item) finishes too quickly, but it does not help in your situation since the iterator still gets consumed fully immediately after the map_async call is issued.

这篇关于结合itertools和多处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆