如何将生成器与Multiprocessing map函数一起使用 [英] How to use a generator as an iterable with Multiprocessing map function

查看:70
本文介绍了如何将生成器与Multiprocessing map函数一起使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我将生成器用作multiprocessing.Pool.map函数的可迭代参数时:

pool.map(func, iterable=(x for x in range(10)))

在调用func之前,似乎发电机已完全耗尽.

我想产生每个项目并将其传递给每个过程,谢谢

解决方案

multiprocessing.map在处理之前将没有__len__方法的可迭代对象转换为列表.这样做是为了帮助计算块大小,池将其用于分组工作人员参数并减少调度作业的往返成本.这不是最佳选择,尤其是在chunksize为1的情况下,但是由于map必须以一种或另一种方式耗尽迭代器,因此通常不会出现重大问题.

相关代码在pool.py中.注意它使用len:

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
        error_callback=None):
    '''
    Helper function to implement map, starmap and their async counterparts.
    '''
    if self._state != RUN:
        raise ValueError("Pool not running")
    if not hasattr(iterable, '__len__'):
        iterable = list(iterable)

    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0

When I use a generator as an iterable argument with multiprocessing.Pool.map function:

pool.map(func, iterable=(x for x in range(10)))

It seems that the generator is fully exhausted before func is ever called.

I want to yield each item and pass it to each process, thanks

解决方案

multiprocessing.map converts iterables without a __len__ method to a list before processing. This is done to aid the calculation of chunksize, which the pool uses to group worker arguments and reduce the round trip cost of scheduling jobs. This is not optimal, especially when chunksize is 1, but since map must exhaust the iterator one way or the other, its usually not a significant issue.

The relevant code is in pool.py. Notice its use of len:

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
        error_callback=None):
    '''
    Helper function to implement map, starmap and their async counterparts.
    '''
    if self._state != RUN:
        raise ValueError("Pool not running")
    if not hasattr(iterable, '__len__'):
        iterable = list(iterable)

    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0

这篇关于如何将生成器与Multiprocessing map函数一起使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆