多处理池是否为每个进程分配了相同数量的任务,或者是否将它们分配为可用的? [英] Do multiprocessing pools give every process the same number of tasks, or are they assigned as available?

查看:109
本文介绍了多处理池是否为每个进程分配了相同数量的任务,或者是否将它们分配为可用的?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

map可迭代为multiprocessing.Pool时,是将迭代在开始时针对池中的每个进程划分为一个队列,还是在一个进程空闲时是否有一个共同的队列来执行任务?

    def generate_stuff():
        for foo in range(100):
             yield foo

    def process(moo):
        print moo

    pool = multiprocessing.Pool()
    pool.map(func=process, iterable=generate_stuff())
    pool.close()

因此,给出了这个未经测试的建议代码;如果池中有4个进程,那么每个进程是否分配了25项要做的事情,或者由寻找一件事情的进程一个接一个地挑出100项工作,因此每个进程可能会处理不同数量的事情,例如30个,26、24、20.

解决方案

因此,给出了这个未经测试的建议代码;如果池中有4个进程,那么每个进程是否分配了25项要做的事情,或者由寻找一件事情的进程一个接一个地挑出100项工作,因此每个进程可能会处理不同数量的事情,例如30个,26、24、20.

好吧,显而易见的答案是对其进行测试.

按原样,测试可能不会告诉您太多信息,因为作业将尽快完成,并且即使合并的流程在准备就绪时抢占了作业,事情也可能最终平均分配.但是有一种简单的方法可以解决此问题:

import collections
import multiprocessing
import os
import random
import time

def generate_stuff():
    for foo in range(100):
        yield foo

def process(moo):
    #print moo
    time.sleep(random.randint(0, 50) / 10.)
    return os.getpid()

pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)

如果数字是锯齿状的",则您知道池化进程必须正在准备好获取新作业. (我将chunksize显式设置为1,以确保块的大小不会太大,以至于每个块都只获得一个块.)

当我在8核计算机上运行它时:

Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})

因此,看来流程正在迅速获得新的工作.

由于您专门询问了大约4名工人,所以我将Pool()更改为Pool(4)并得到了:

Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})

但是,有一种比测试更好的方法:阅读

When you map an iterable to a multiprocessing.Pool are the iterations divided into a queue for each process in the pool at the start, or is there a common queue from which a task is taken when a process comes free?

    def generate_stuff():
        for foo in range(100):
             yield foo

    def process(moo):
        print moo

    pool = multiprocessing.Pool()
    pool.map(func=process, iterable=generate_stuff())
    pool.close()

So given this untested suggestion code; if there are 4 processes in the pool does each process get allocated 25 stuffs to do, or do the 100 stuffs get picked off one by one by processes looking for stuff to do so that each process might do a different number of stuffs, eg 30, 26, 24, 20.

So given this untested suggestion code; if there are 4 processes in the pool does each process get allocated 25 stuffs to do, or do the 100 stuffs get picked off one by one by processes looking for stuff to do so that each process might do a different number of stuffs, eg 30, 26, 24, 20.

Well, the obvious answer is to test it.

As-is, the test may not tell you much, because the jobs are going to finish ASAP, and it's possible that things will end up evenly distributed even if pooled processes grab jobs as they become ready. But there's an easy way to fix that:

import collections
import multiprocessing
import os
import random
import time

def generate_stuff():
    for foo in range(100):
        yield foo

def process(moo):
    #print moo
    time.sleep(random.randint(0, 50) / 10.)
    return os.getpid()

pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)

If the numbers are "jagged", you know either that pooled processes must be grabbing new jobs as ready. (I explicitly set chunksize to 1 to make sure the chunks aren't so big that each only gets one chunk in the first place.)

When I run it on an 8-core machine:

Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})

So, it looks like the processes are getting new jobs on the fly.

Since you specifically asked about 4 workers, I changed Pool() to Pool(4) and got this:

Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})

However, there's an even better way to find out than by testing: read the source.

As you can see, map just calls map_async, which creates a bunch of batches and puts them on a self._taskqueue object (a Queue.Queue instance). If you read further, this queue isn't shared with the other processes directly, but there's a pool manager thread that, whenever a process finishes and returns a result, pops the next job off the queue and submits it back to the process.

This is also how you can find out what the default chunksize is for map. The 2.7 implementation linked above shows that it's just len(iterable) / (len(self._pool) * 4) rounded up (slightly more verbose than that to avoid fractional arithmetic)—or, put another way, just big enough for about 4 chunks per process. But you really shouldn't rely on this; the documentation vaguely and indirectly implies that it's going to use some kind of heuristic, but doesn't give you any guarantees as to what that will be. So, if you really need "about 4 chunks per process", calculate it explicitly. More realistically, if you ever need anything besides the default, you probably need a domain-specific value that you're going to work out (by calculation, guessing, or profiling).

这篇关于多处理池是否为每个进程分配了相同数量的任务,或者是否将它们分配为可用的?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆