在无限循环中停止连接到队列的python多处理工作器的最干净方法是什么? [英] What is the cleanest way to stop a python multiprocessing worker attached to a queue in an infinite loop?
问题描述
我正在使用multiprocessing.Pool
和multiprocessing.Queue
在python中实现生产者-消费者模式.使用者是使用gevent
派生多个任务的预分支过程.
I'm implementing a producer-consumer pattern in python using multiprocessing.Pool
and multiprocessing.Queue
. Consumers are pre-forked processes that uses gevent
to spawn multiple tasks.
这是代码的精简版:
import gevent
from Queue import Empty as QueueEmpty
from multiprocessing import Process, Queue, Pool
import signal
import time
# Task queue
queue = Queue()
def init_worker ():
# Ignore signals in worker
signal.signal( signal.SIGTERM, signal.SIG_IGN )
signal.signal( signal.SIGINT, signal.SIG_IGN )
signal.signal( signal.SIGQUIT, signal.SIG_IGN )
# One of the worker task
def worker_task1( ):
while True:
try:
m = queue.get( timeout = 2 )
# Break out if producer says quit
if m == 'QUIT':
print 'TIME TO QUIT'
break
except QueueEmpty:
pass
# Worker
def work( ):
gevent.joinall([
gevent.spawn( worker_task1 ),
])
pool = Pool( 2, init_worker )
for i in xrange( 2 ):
pool.apply_async( work )
try:
while True:
queue.put( 'Some Task' )
time.sleep( 2 )
except KeyboardInterrupt as e:
print 'STOPPING'
# Signal all workers to quit
for i in xrange( 2 ):
queue.put( 'QUIT' )
pool.join()
现在当我尝试退出它时,我得到以下状态:
Now when I try to quit it, I get following state:
- 父进程正在等待其中一个孩子加入.
- 其中一个子项处于失效状态.如此完成,但父母正在等待其他孩子完成.
- 其他孩子正在显示:
futex(0x7f99d9188000, FUTEX_WAIT, 0, NULL ...
.
那么干净地结束这一过程的正确方法是什么?
So what is the correct way to end such a process cleanly?
推荐答案
我发现了问题所在.根据 multiprocessing.Pool.join()
的文档,pool
必须为close()ed
,然后才能为join()ed
.在pool.join()
之前添加pool.close()
可以解决问题.
I figured out the problem. According to documentation for multiprocessing.Pool.join()
, pool
needs to be close()ed
before it can be join()ed
. Adding pool.close()
before pool.join()
solved the problem.
这篇关于在无限循环中停止连接到队列的python多处理工作器的最干净方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!