多处理池和队列 [英] Multiprocessing pool and queues

查看:90
本文介绍了多处理池和队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在对池使用多处理.我需要将结构作为参数传递给必须在单独的进程中使用的函数.我无法使用multiprocessing.Pool的映射功能,因为我既不能复制Pool.Queue,也不能复制Pool.Array.该结构将在运行中用于记录每个终止过程的结果.这是我的代码:

I am using multiprocessing with pools. I need to pass a structure as argument to a function that has to be used in separate processes. I am facing an issue with the mapping functions of the multiprocessing.Pool, since I cannot duplicate neither Pool.Queue, nor Pool.Array. This structure is to be used on the fly to log the result of each terminated process. Here is my code:

import multiprocessing
from multiprocessing import Process, Manager, Queue, Array
import itertools
import time

def do_work(number, out_queue=None):
    if out_queue is not None:
        print "Treated nb ", number
        out_queue.append("Treated nb " + str(number))
    return 0


def multi_run_wrapper(iter_values):
    return do_work(*iter_values)

def test_pool():
    # Get the max cpu
    nb_proc = multiprocessing.cpu_count()

    pool = multiprocessing.Pool(processes=nb_proc)
    total_tasks = 16
    tasks = range(total_tasks)

    out_queue= Queue()  # Use it instead of out_array and change out_queue.append() into out_queue.put() in the do_work() function.
    out_array = Array('i', total_tasks)
    iter_values = itertools.izip(tasks, itertools.repeat(out_array))
    results = pool.map_async(multi_run_wrapper, iter_values)

    pool.close()
    pool.join()
    print results._value
    while not out_queue.empty():
        print "queue: ", out_queue.get()
    print "out array: \n", out_array

if __name__ == "__main__":
    test_pool()

我需要在一个分离的进程中启动一个工作程序,并将我的输出队列作为参数传递.我还想指定包含有限数量的正在运行的进程的池.为此,我正在使用pool.map_async()函数.不幸的是,上面的代码给了我一个错误:

I need to launch a worker in a detached process and to pass my output queue as argument. I also want to specify the pool containing a limited number of running processes. For that I am using the pool.map_async() function. Unfortunately the piece of code above gives me an error:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 808, in __bootstrap_inner
    self.run()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 761, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
    put(task)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__
    assert_spawning(self)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning
    ' through inheritance' % type(self).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance

我相信这是因为Queue不能被复制,就像我在文档中所读到的那样. 然后我想到将队列设为全局变量,这样就不再需要传递它了,但是在我看来,这太混乱了.我还考虑过使用multiprocessing.Array代替

I believe it is because a Queue cannot be copied, ever, as I read in the doc. Then I thought of making the queue a global variable so that I would not need to pass it anynmore, but that would be so messy in my opinion. I also thought of using a multiprocessing.Array instead

out_array = Array('i', total_tasks)

但是会出现与队列相同的错误:

but the same error would be risen as with queues:

# ...
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance

我需要使用此功能-使用多重处理并交换子流程中的信息- 在一个相对较大的软件中,所以我希望我的代码保持整洁.

I need to use this feature - use of multiprocessing and exchanging informations from subprocesses - in a relatively big software so I want my code to remain clean and tidy.

如何以一种优雅的方式将队列传递给我的工人?

How can I pass the queue to my worker in an elegant way?

当然,欢迎使用任何其他处理主要规范的方式.

Of course, any other way of dealing with the main specification is welcome.

推荐答案

multiprocessing.Pool在其工作队列中将不接受multiprocessing.Queue作为参数.我相信这是因为它在内部使用队列将数据来回发送到工作进程.有几种解决方法:

multiprocessing.Pool will not accept a multiprocessing.Queue as an argument in its work queue. I believe this is because it internally uses queues to send data back and forth to the worker processes. There are a couple workarounds:

1)您真的需要使用队列吗? Pool函数的一个优点是它们的返回值被发送回主进程.通常,遍历池中的返回值比使用单独的队列要好.这也避免了通过检查queue.empty()

1) Do you really need to use a queue? One advantage of the Pool function is that their return values are sent back to the main processes. It is generally better to iterate over the return values from a pool than to use a separate queue. This also avoids the race condition introduce by checking queue.empty()

2)如果必须使用Queue,则可以使用multiprocessing.Manager中的一个.这是共享队列的代理,可以作为Pool函数的参数来传递.

2) If you must use a Queue, you can use one from multiprocessing.Manager. This is a proxy to a shared queue which can be passed as an argument to the Pool functions.

3)您可以使用Pool时使用"noreferrer>初始化器(例如 https://stackoverflow.com/a/3843313 ).这有点怪.

3) You can pass a normal Queue to worker processes by using an initializer when creating the Pool(like https://stackoverflow.com/a/3843313). This is kinda hacky.

比赛条件来自:

while not out_queue.empty():
    print "queue: ", out_queue.get()

当有工作进程填充队列时,您可能会遇到以下情况:队列当前为空,因为工作进程将要放入一些东西.如果此时选择.empty(),则将提早结束.更好的方法是将 sentinal 值放入队列中,以在完成将数据放入队列中时发出信号.

When you have worker processes filling your queue, you can have the condition where your queue is currently empty because a worker is about to put something into it. If you check .empty() at this time you will end early. A better method is to put sentinal values in your queue to signal when you are finished putting data into it.

这篇关于多处理池和队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆