python多重处理的生产者/消费者问题 [英] producer/consumer problem with python multiprocessing

查看:125
本文介绍了python多重处理的生产者/消费者问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个有一个生产者和多个消费者的服务器程序, 令我感到困惑的是,只有第一个进入队列的任务生产者得到了 已消耗,此后排队的任务不再消耗,它们仍然存在 永远在队列中.

I am writing a server program with one producer and multiple consumers, what confuses me is only the first task producer put into the queue gets consumed, after which tasks enqueued no longer get consumed, they remain in the queue forever.

from multiprocessing import Process, Queue, cpu_count
from http import httpserv
import time

def work(queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(5)
        print "task done:", task
    queue.put(None)

class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        self.workers = [Process(target=work, args=(self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        httpserv(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESSES):
            self.workers[i].join()
        queue.close()

Manager().start()

生产者是一个HTTP服务器,一旦收到,它将任务放入队列 来自用户的请求.看来,消费者流程仍在 当队列中有新任务出现时,该任务被阻止,这很奇怪.

The producer is a HTTP server which put a task in the queue once receive a request from the user. It seems that consumer processes are still blocked when there are new tasks in the queue, which is weird.

P.S.另外两个与以上无关的问题,我不确定是否 最好将HTTP服务器置于主进程之外的自己的进程中 流程,如果可以,我如何才能使主流程始终保持运行 子进程结束.第二个问题,阻止这种情况的最佳方法是什么 优雅地使用HTTP服务器?

P.S. Another two questions not relating to the above, I am not sure if it's better to put HTTP server in its own process other than the main process, if yes how can I make the main process keep running before all children processes end. Second question, what's the best way to stop the HTTP server gracefully?

编辑:添加生产者代码,它只是一个简单的python wsgi服务器:

Edit: add producer code, it's just a simple python wsgi server:

import fapws._evwsgi as evwsgi
from fapws import base

def httpserv(queue):
    evwsgi.start("0.0.0.0", 8080)
    evwsgi.set_base_module(base)

    def request_1(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_1')
        return ["request 1!"]

    def request_2(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_2')
        return ["request 2!!"]

    evwsgi.wsgi_cb(("/request_1", request_1))
    evwsgi.wsgi_cb(("/request_2", request_2))

    evwsgi.run()

推荐答案

我认为Web服务器部分一定存在问题,因为它可以正常工作:

I think there must be something wrong with the web server part, as this works perfectly:

from multiprocessing import Process, Queue, cpu_count
import random
import time


def serve(queue):
    works = ["task_1", "task_2"]
    while True:
        time.sleep(0.01)
        queue.put(random.choice(works))


def work(id, queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(0.05)
        print "%d task:" % id, task
    queue.put(None)


class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        print "starting %d workers" % self.NUMBER_OF_PROCESSES
        self.workers = [Process(target=work, args=(i, self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        serve(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESSES):
            self.workers[i].join()
        self.queue.close()


Manager().start()

示例输出:

starting 2 workers
0 task: task_1
1 task: task_2
0 task: task_2
1 task: task_1
0 task: task_1

这篇关于python多重处理的生产者/消费者问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆