为什么在python map()和multiprocessing.Pool.map()得到不同的答案? [英] why in python map() and multiprocessing.Pool.map() got different answers?

查看:284
本文介绍了为什么在python map()和multiprocessing.Pool.map()得到不同的答案?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个奇怪的问题。我有一个格式的文件:

  START 
1
2
停止
lllllllll
开始
3
5
6
停止

,我想将 START STOP 之间的行作为块读取,并使用 my_f 来处理每个块。

  def block_generator(file):

打开(文件)为行:
行行:
如果行=='开始':
块= itertools.takewhile(lambda x:x! ='STOP',行)
产生块

在我的主要功能我试图使用 map()来完成工作。它的工作。

  blocks = block_generator(file)
map(my_f,blocks)

实际上会给我我想要的。但是当我使用 multiprocessing.Pool.map()尝试相同的东西时,它给了我一个错误,说takewhile()想要采取2个参数,被赋予0.



  blocks = block_generator(file)
p = multiprocessing.Pool(4)
p.map(my_f,blocks )

这是个错误吗?


  1. 该文件有超过1000000个块,每个块都不到100行。

  2. 我接受答案形式untubu。

  3. 但是也许我会简单地分割文件,并使用我的原始脚本的n个实例,而无需多进程来处理它们,然后将结果集中在一起。这样,只要脚本在一个小文件上工作就可以永远不会错。


解决方案

如何:

  import itertools 

def grouper(n,iterable,fillvalue = None):
#资料来源:http://docs.python.org/library/itertools.html#recipes
grouper(3,'ABCDEFG','x') - > ABC DEF Gxx
return itertools.izip_longest(* [iter(iterable)] * n,fillvalue = fillvalue)

def block_generator(file):
with open(file)as lines:
行行:
if line =='START':
block = list(itertools.takewhile(lambda x:x!='STOP',lines))
yield block

块=块_生成器(文件)
p = multiprocessing.Pool(4)
在石斑鱼(100,块,fillvalue ='')中的块:
p。 map(my_f,chunk)

使用 grouper 将限制消耗的文件数量 pm AP 。因此,整个文件不需要一次读入内存(进入任务队列)。






当你调用 p.map(func,iterator)时,整个迭代器被直接消耗以填充任务队列。然后池工作人员从队列中获取任务并同时处理作业。



如果您查看 pool.py 并跟踪定义,您将看到
_handle_tasks 线程从 self._taskqueue ,并列举一次:

 为我,任务枚举(taskseq):
...
put(task)

结论是,传递给 p.map 的迭代器一次被消耗。在下一个任务从队列中获得之前,没有等待一个任务结束。



进一步的佐证,如果你运行这个:



演示代码

 将多重处理作为mp 
进口时间
进口记录

def foo(x):
time.sleep(1)
返回x * x

def block():
for x in range(1000):
if x%100 == 0:
logger.info('Got here')
yield x

logger = mp.log_to_stderr(logging.DEBUG)
logger.setLevel(logging.DEBUG)
pool = mp.Pool()
print pool.map(foo,blocks ())

你会看到 Got here 消息几乎立即打印10次,然后由于 time.sleep(1)调用 foo 。这显然表明迭代器在池进程完成任务之前已经完全消耗。


I had a strange problem. I have a file of the format:

START
1
2
STOP
lllllllll
START
3
5
6
STOP

and I want to read the lines between START and STOP as blocks, and use my_f to process each block.

def block_generator(file):

with open(file) as lines:
    for line in lines:
        if line == 'START': 
            block=itertools.takewhile(lambda x:x!='STOP',lines) 
            yield block   

and in my main function I tried to use map() to get the work done. It worked.

blocks=block_generator(file)
map(my_f,blocks)

will actually give me what I want. But when I tried the same thing with multiprocessing.Pool.map(), it gave me an error said takewhile() wanted to take 2 arguments, was given 0.

    blocks=block_generator(file)
    p=multiprocessing.Pool(4) 
    p.map(my_f,blocks)

Is this a bug?

  1. The file have more than 1000000 blocks, each has less than 100 lines.
  2. I accept the answer form untubu.
  3. But maybe I will simple split the file and use n instance of my original script without multiprocessing to processing them then cat the results together. This way you can never be wrong as long as the script works on a small file.

解决方案

How about:

import itertools

def grouper(n, iterable, fillvalue=None):
    # Source: http://docs.python.org/library/itertools.html#recipes
    "grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
    return itertools.izip_longest(*[iter(iterable)]*n,fillvalue=fillvalue)

def block_generator(file):
    with open(file) as lines:
        for line in lines:
            if line == 'START': 
                block=list(itertools.takewhile(lambda x:x!='STOP',lines))
                yield block

blocks=block_generator(file)
p=multiprocessing.Pool(4)
for chunk in grouper(100,blocks,fillvalue=''):
    p.map(my_f,chunk)

Using grouper will limit the amount of the file consumed by p.map. Thus the whole file need not be read into memory (fed into the task queue) at once.


I claim above that when you call p.map(func,iterator), the entire iterator is consumed immediatedly to fill a task queue. The pool workers then get tasks from the queue and work on the jobs concurrently.

If you look inside pool.py and trace through the definitions, you will see the _handle_tasks thread gets items from self._taskqueue, and enumerates that at once:

         for i, task in enumerate(taskseq):
             ...
             put(task)

The conclusion is, the iterator passed to p.map gets consumed at once. There is no waiting for the one task to end before the next task is gotten from the queue.

As further corroboration, if you run this:

demonstration code:

import multiprocessing as mp
import time
import logging

def foo(x):
    time.sleep(1)
    return x*x

def blocks():
    for x in range(1000):
        if x%100==0:
            logger.info('Got here')
        yield x

logger=mp.log_to_stderr(logging.DEBUG)
logger.setLevel(logging.DEBUG) 
pool=mp.Pool() 
print pool.map(foo, blocks()) 

You will see the Got here message printed 10 times almost immediately, and then a long pause due to the time.sleep(1) call in foo. This manifestly shows the iterator is fully consumed long before the pool processes gets around to finishing the tasks.

这篇关于为什么在python map()和multiprocessing.Pool.map()得到不同的答案?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆