多处理-池分配 [英] multiprocessing - pool allocation

查看:91
本文介绍了多处理-池分配的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我注意到python在池分配中的这种行为.即使池中有20个进程,当我对8个进程执行map_async时,也没有抛出所有要执行的进程,而是仅执行了4个.当这四个完成时,它又发送两个,然后当两个完成时,发送一个.

I notice this behavior in python for the pool allocation. Even though I have 20 processes in the pool, when I do a map_async for say 8 processes, instead of throwing all the processes to execute, I get only 4 executing. when those 4 finish, it sends two more, and then when those two finish is sends one.

当我向它扔20个以上时,它将全部运行20个,直到重复上述行为后,队列中的数量开始少于20个.

When I throw more than 20 at it, it runs all 20, until it starts to get less than 20 in the queue, when the above behavior repeats.

我认为这是有目的的,但是看起来很奇怪.我的目标是尽快处理请求,显然这种行为不适合.

I assume this is done on purpose, but it looks weird. My goal is to have the requests processed as soon as they come in and obviously this behavior does not fit.

将python 2.6与 billiard 配合使用,以实现对maxtasksperchild的支持

Using python 2.6 with billiard for maxtasksperchild support

有什么想法可以改进吗?

Any ideas how can I improve it?

代码:

mypool = pool.Pool(processes=settings['num-processes'], initializer=StartChild, maxtasksperchild=10)

while True:
    lines = DbData.GetAll()
    if len(lines) > 0:
        print 'Starting to process: ', len(lines), ' urls'
        Res = mypool.map_async(RunChild, lines)
        Returns = Res.get(None)
        print 'Pool returns: ', idx, Returns
    else:
        time.sleep(0.5)

推荐答案

我在Python中处理多处理的一种方法是:

One way I deal with multiprocessing in Python is the following:

我有要使用功能function()的数据.
首先,我创建一个多处理子类:

I have data on which I want to use a function function().
First I create a multiprocessing subclass:

import multiprocessing

class ProcessThread(multiprocessing.Process):
    def __init__(self, id_t, inputqueue, idqueue, function, resultqueue):
        self.id_t = id_t
        self.inputlist = inputqueue
        self.idqueue = idqueue
        self.function = function
        self.resultqueue = resultqueue

        multiprocessing.Process.__init__(self)

    def run(self):
        s = "process number: " + str(self.id_t) + " starting"
        print s
        result = []

        while self.inputqueue.qsize() > 0
            try:
                inp = self.inputqueue.get()
            except Exception:
                pass
            result = self.function(inp)
            while 1:
               try:
                   self.resultqueue.put([self.id,])
               except Exception:
                   pass
               else:
                   break
            self.idqueue.put(id)
            return

和主要功能:

inputqueue = multiprocessing.Queue()
resultqueue = multiprocessing.Queue()
idqueue = multiprocessing.Queue()

def function(data):
    print data # or what you want

for datum in data:
    inputqueue.put(datum)

for i in xrange(nbprocess):
    ProcessThread(i, inputqueue, idqueue, function, resultqueue).start()

最后得到结果:

results = []
while idqueue.qsize() < nbprocess:
    pass
while resultqueue.qsize() > 0:
    results.append(resultqueue.get())

通过这种方式,您可以完美地控制流程和其他内容的附加内容. 仅当由于不同进程同时访问队列而导致每个数据的计算非常慢(<1,2秒)时,才使用多处理inputqueue是一种有效的技术(这就是为什么我使用异常).如果您的函数计算速度非常快,请考虑在开始时仅对数据进行一次拆分,并在开始时将每个过程的数据集块放入.

In this way you can control perfectly what is appended with process and other stuff. Using a multiprocessing inputqueue is an efficient technique only if the computation for each datum is quite slow (< 1,2 seconds) because of the concurrent access of the different process to the queues (that why I use exception). If your function computes very quickly, consider splitting up your data only once at the begining and put chunks of the dataset for every process at the beginning.

这篇关于多处理-池分配的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆