一个多处理管理器中的多个队列 [英] Multiple queues from one multiprocessing Manager
问题描述
我正在编写一个脚本,该脚本将使用python的多处理和线程模块.
为了您的理解,我产生了尽可能多的可用内核进程,并且在每个进程内部都启动了例如25个线程.
每个线程从input_queue
消耗并产生为output_queue
.
对于队列对象,我使用multiprocessing.Queue
.
I'm writing a script that will use python's multiprocessing and threading module.
For your understanding, I spawn as much processes as cores are available and inside each process I start e.g. 25 threads.
Each thread consumes from an input_queue
and produces to an output_queue
.
For the queue object I use multiprocessing.Queue
.
在我的第一次测试后,我陷入了死锁,因为负责馈送和刷新Queue的线程已挂起.一段时间后,我发现可以使用Queue().cancel_join_thread()
解决此问题.
After my first tests I got a deadlock because the the thread responsible to feed and flush the Queue was hanging. After a while I found that I can use Queue().cancel_join_thread()
to work around this problem.
但是由于可能会丢失数据,我想使用:multiprocessing.Manager().Queue()
But because of the possibility of data loss I would like to use: multiprocessing.Manager().Queue()
现在的实际问题是: 每个队列使用一个管理器对象更好吗?还是应该创建一个管理器并从同一管理器对象获得两个问题?
Now actual question: Is it better to use one manager object for each queue? Or should I create one manager and get two quese from the same manager object?
# One manager for all queues
import multiprocessing
manager = multiprocessing.Manager()
input_queue = manager.Queue()
output_queue = manager.Queue()
...Magic...
# As much managers as queues
manager_in = multiprocessing.Manager()
queue_in = manager_in.Queue()
manager_out = multiprocessing.Manager()
queue_out = manager_out.Queue()
...Magic...
谢谢您的帮助.
推荐答案
不需要使用两个单独的Manager
对象.如您所见,Manager
对象允许在多个进程之间共享对象.来自文档:
There is no need to use two separate Manager
objects. As you have already seen the Manager
object allows sharing objects among multiple processes; from the docs:
管理器提供了一种创建可以在不同进程之间共享的数据的方法.管理器对象控制管理共享对象的服务器进程.其他进程可以使用代理访问共享对象.
Managers provide a way to create data which can be shared between different processes. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.
因此,如果您有两个不同的队列,则仍然可以使用同一管理器.如果它对某人有所帮助,这是一个简单的示例,其中一个管理员使用两个队列:
Therefore if you have two different queues you can still use the same manager. In case it helps someone, here is a simple example using two queues with one manager:
from multiprocessing import Manager, Process
import time
class Worker(Process):
"""
Simple worker.
"""
def __init__(self, name, in_queue, out_queue):
super(Worker, self).__init__()
self.name = name
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
while True:
# grab work; do something to it (+1); then put the result on the output queue
work = self.in_queue.get()
print("{} got {}".format(self.name, work))
work += 1
# sleep to allow the other workers a chance (b/c the work action is too simple)
time.sleep(1)
# put the transformed work on the queue
print("{} puts {}".format(self.name, work))
self.out_queue.put(work)
if __name__ == "__main__":
# construct the queues
manager = Manager()
inq = manager.Queue()
outq = manager.Queue()
# construct the workers
workers = [Worker(str(name), inq, outq) for name in range(3)]
for worker in workers:
worker.start()
# add data to the queue for processing
work_len = 10
for x in range(work_len):
inq.put(x)
while outq.qsize() != work_len:
# waiting for workers to finish
print("Waiting for workers. Out queue size {}".format(outq.qsize()))
time.sleep(1)
# clean up
for worker in workers:
worker.terminate()
# print the outputs
while not outq.empty():
print(outq.get())
改为使用两名经理:
# construct the queues
manager1 = Manager()
inq = manager1.Queue()
manager2 = Manager()
outq = manager2.Queue()
可以,但是没有必要.
这篇关于一个多处理管理器中的多个队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!