如何获得“工作量"?剩下要由Python多处理池来完成? [英] How to get the amount of "work" left to be done by a Python multiprocessing Pool?

查看:47
本文介绍了如何获得“工作量"?剩下要由Python多处理池来完成?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

到目前为止,每当我需要使用 multiprocessing 时,我都这样做通过手动创建进程池"并与所有子进程共享工作队列.

So far whenever I needed to use multiprocessing I have done so by manually creating a "process pool" and sharing a working Queue with all subprocesses.

例如:

from multiprocessing import Process, Queue


class MyClass:

    def __init__(self, num_processes):
        self._log         = logging.getLogger()
        self.process_list = []
        self.work_queue   = Queue()
        for i in range(num_processes):
            p_name = 'CPU_%02d' % (i+1)
            self._log.info('Initializing process %s', p_name)
            p = Process(target = do_stuff,
                        args   = (self.work_queue, 'arg1'),
                        name   = p_name)

这样,我可以将内容添加到队列中,这些内容将由子流程消耗.然后,我可以通过检查Queue.qsize():

This way I could add stuff to the queue, which would be consumed by the subprocesses. I could then monitor how far the processing was by checking the Queue.qsize():

    while True:
        qsize = self.work_queue.qsize()
        if qsize == 0:
            self._log.info('Processing finished')
            break
        else:
            self._log.info('%d simulations still need to be calculated', qsize)

现在,我认为 multiprocessing.Pool 可以简化了这段代码.

Now I figure that multiprocessing.Pool could simplify a lot this code.

我无法找到的是如何监视仍然要做的工作"的数量.

What I couldn't find out is how can I monitor the amount of "work" still left to be done.

以下面的示例为例:

from multiprocessing import Pool


class MyClass:

    def __init__(self, num_processes):
        self.process_pool = Pool(num_processes)
        # ...
        result_list = []
        for i in range(1000):            
            result = self.process_pool.apply_async(do_stuff, ('arg1',))
            result_list.append(result)
        # ---> here: how do I monitor the Pool's processing progress?
        # ...?

有什么想法吗?

推荐答案

使用Manager队列.这是工作进程之间共享的队列.如果您使用普通队列,则每个工作人员都会对其进行腌制和解酸,因此将其复制,以使每个工作人员无法更新该队列.

Use a Manager queue. This is a queue that is shared between worker processes. If you use a normal queue it will get pickled and unpickled by each worker and hence copied, so that the queue can't be updated by each worker.

然后,您的工作人员将材料添加到队列中,并在工作人员工作时监视队列的状态.您需要使用map_async进行此操作,因为这样可以查看整个结果何时准备就绪,从而可以中断监视循环.

You then have your workers add stuff to the queue and monitor the queue's state while the workers are working. You need to do this using map_async as this lets you see when the entire result is ready, allowing you to break the monitoring loop.

示例:

import time
from multiprocessing import Pool, Manager


def play_function(args):
    """Mock function, that takes a single argument consisting
    of (input, queue). Alternately, you could use another function
    as a wrapper.
    """
    i, q = args
    time.sleep(0.1)  # mock work
    q.put(i)
    return i

p = Pool()
m = Manager()
q = m.Queue()

inputs = range(20)
args = [(i, q) for i in inputs]
result = p.map_async(play_function, args)

# monitor loop
while True:
    if result.ready():
        break
    else:
        size = q.qsize()
        print(size)
        time.sleep(0.1)

outputs = result.get()

这篇关于如何获得“工作量"?剩下要由Python多处理池来完成?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆