在无限循环中停止连接到队列的python多处理工作器的最干净方法是什么? [英] What is the cleanest way to stop a python multiprocessing worker attached to a queue in an infinite loop?

查看:128
本文介绍了在无限循环中停止连接到队列的python多处理工作器的最干净方法是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用multiprocessing.Poolmultiprocessing.Queue在python中实现生产者-消费者模式.使用者是使用gevent派生多个任务的预分支过程.

I'm implementing a producer-consumer pattern in python using multiprocessing.Pool and multiprocessing.Queue. Consumers are pre-forked processes that uses gevent to spawn multiple tasks.

这是代码的精简版:

import gevent
from Queue import Empty as QueueEmpty
from multiprocessing import Process, Queue, Pool
import signal
import time

# Task queue
queue = Queue()

def init_worker ():
    # Ignore signals in worker
    signal.signal( signal.SIGTERM, signal.SIG_IGN )
    signal.signal( signal.SIGINT, signal.SIG_IGN )
    signal.signal( signal.SIGQUIT, signal.SIG_IGN )

# One of the worker task
def worker_task1( ):
    while True:
        try:
            m = queue.get( timeout = 2 )

            # Break out if producer says quit
            if m == 'QUIT':
                print 'TIME TO QUIT'
                break

        except QueueEmpty:
            pass

# Worker
def work( ):
    gevent.joinall([
        gevent.spawn( worker_task1 ),
    ])

pool = Pool( 2, init_worker )
for i in xrange( 2 ):
    pool.apply_async( work )

try:
    while True:
        queue.put( 'Some Task' )
        time.sleep( 2 )

except KeyboardInterrupt as e:
    print 'STOPPING'

    # Signal all workers to quit
    for i in xrange( 2 ):
        queue.put( 'QUIT' )

    pool.join()

现在当我尝试退出它时,我得到以下状态:

Now when I try to quit it, I get following state:

  1. 父进程正在等待其中一个孩子加入.
  2. 其中一个子项处于失效状态.如此完成,但父母正在等待其他孩子完成.
  3. 其他孩子正在显示:futex(0x7f99d9188000, FUTEX_WAIT, 0, NULL ....

那么干净地结束这一过程的正确方法是什么?

So what is the correct way to end such a process cleanly?

推荐答案

我发现了问题所在.根据 multiprocessing.Pool.join() 的文档,pool必须为close()ed,然后才能为join()ed.在pool.join()之前添加pool.close()可以解决问题.

I figured out the problem. According to documentation for multiprocessing.Pool.join(), pool needs to be close()ed before it can be join()ed. Adding pool.close() before pool.join() solved the problem.

这篇关于在无限循环中停止连接到队列的python多处理工作器的最干净方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆