python多处理的生产者/消费者问题 [英] producer/consumer problem with python multiprocessing

查看:15
本文介绍了python多处理的生产者/消费者问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个有一个生产者和多个消费者的服务器程序,让我感到困惑的是只有第一个放入队列的任务生产者得到消耗,之后排队的任务不再被消耗,它们仍然存在永远在队列中.

I am writing a server program with one producer and multiple consumers, what confuses me is only the first task producer put into the queue gets consumed, after which tasks enqueued no longer get consumed, they remain in the queue forever.

from multiprocessing import Process, Queue, cpu_count
from http import httpserv
import time

def work(queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(5)
        print "task done:", task
    queue.put(None)

class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        self.workers = [Process(target=work, args=(self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        httpserv(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESSES):
            self.workers[i].join()
        queue.close()

Manager().start()

生产者是一个 HTTP 服务器,一旦接收到任务,它就会将任务放入队列中来自用户的请求.看来消费者流程还在当队列中有新任务时阻塞,这很奇怪.

The producer is a HTTP server which put a task in the queue once receive a request from the user. It seems that consumer processes are still blocked when there are new tasks in the queue, which is weird.

附:另外两个与上述无关的问题,我不确定是否最好将 HTTP 服务器放在自己的进程中而不是主进程中进程,如果是,我怎样才能让主进程继续运行子进程结束.第二个问题,什么是最好的方法来阻止HTTP 服务器优雅吗?

P.S. Another two questions not relating to the above, I am not sure if it's better to put HTTP server in its own process other than the main process, if yes how can I make the main process keep running before all children processes end. Second question, what's the best way to stop the HTTP server gracefully?

编辑:添加生产者代码,它只是一个简单的python wsgi服务器:

Edit: add producer code, it's just a simple python wsgi server:

import fapws._evwsgi as evwsgi
from fapws import base

def httpserv(queue):
    evwsgi.start("0.0.0.0", 8080)
    evwsgi.set_base_module(base)

    def request_1(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_1')
        return ["request 1!"]

    def request_2(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_2')
        return ["request 2!!"]

    evwsgi.wsgi_cb(("/request_1", request_1))
    evwsgi.wsgi_cb(("/request_2", request_2))

    evwsgi.run()

推荐答案

我认为 Web 服务器部分一定有问题,因为它运行良好:

I think there must be something wrong with the web server part, as this works perfectly:

from multiprocessing import Process, Queue, cpu_count
import random
import time


def serve(queue):
    works = ["task_1", "task_2"]
    while True:
        time.sleep(0.01)
        queue.put(random.choice(works))


def work(id, queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(0.05)
        print "%d task:" % id, task
    queue.put(None)


class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        print "starting %d workers" % self.NUMBER_OF_PROCESSES
        self.workers = [Process(target=work, args=(i, self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        serve(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESSES):
            self.workers[i].join()
        self.queue.close()


Manager().start()

样本输出:

starting 2 workers
0 task: task_1
1 task: task_2
0 task: task_2
1 task: task_1
0 task: task_1

这篇关于python多处理的生产者/消费者问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆