多重处理-生产者/消费者设计 [英] Multiprocessing - producer/consumer design

查看:82
本文介绍了多重处理-生产者/消费者设计的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用多处理模块来分解一个非常大的任务.它在大多数情况下都有效,但是我必须在设计中缺少一些明显的东西,因为这样很难有效地分辨出何时处理了所有数据.

I'm using the multiprocessing module to split up a very large task. It works for the most part, but I must be missing something obvious with my design, because this way it's very hard for me to effectively tell when all of the data has been processed.

我有两个独立的任务在运行;一个养活了另一个.我猜这是一个生产者/消费者问题.我在所有进程之间使用共享的Queue,其中生产者填充队列,而使用者从队列中读取并进行处理.问题在于数据量有限,因此在某些时候每个人都需要知道所有数据均已处理,以便系统可以正常关闭.

I have two separate tasks that run; one that feeds the other. I guess this is a producer/consumer problem. I use a shared Queue between all processes, where the producers fill up the queue, and the consumers read from the queue and do the processing. The problem is that there is a finite amount of data, so at some point everyone needs to know that all of the data has been processed so the system can shut down gracefully.

使用map_async()函数似乎很有意义,但是由于生产者正在填充队列,因此我不知道所有项目都在前面,因此我必须进入while循环并使用apply_async()并尝试检测何时发生某种超时...丑陋.

It would seem to make sense to use the map_async() function, but since the producers are filling up the queue, I don't know all of the items up front, so I have to go into a while loop and use apply_async() and try to detect when everything is done with some sort of timeout...ugly.

我觉得我缺少明显的东西.如何更好地设计?

I feel like I'm missing something obvious. How can this be better designed?

PRODCUER

class ProducerProcess(multiprocessing.Process):
    def __init__(self, item, consumer_queue):
        self.item = item
        self.consumer_queue = consumer_queue
        multiprocessing.Process.__init__(self)

    def run(self):
        for record in get_records_for_item(self.item): # this takes time
            self.consumer_queue.put(record)

def start_producer_processes(producer_queue, consumer_queue, max_running):
    running = []

    while not producer_queue.empty():
        running = [r for r in running if r.is_alive()]
        if len(running) < max_running:
            producer_item = producer_queue.get()
            p = ProducerProcess(producer_item, consumer_queue)
            p.start()
            running.append(p)
        time.sleep(1)

消费者

def process_consumer_chunk(queue, chunksize=10000):
    for i in xrange(0, chunksize):
        try:
            # don't wait too long for an item
            # if new records don't arrive in 10 seconds, process what you have
            # and let the next process pick up more items.

            record = queue.get(True, 10)
        except Queue.Empty:                
            break

        do_stuff_with_record(record)

主要

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    consumer_queue = manager.Queue(1024*1024)
    producer_queue = manager.Queue()

    producer_items = xrange(0,10)

    for item in producer_items:
        producer_queue.put(item)

    p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8))
    p.start()

    consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1)

这是俗气的地方.我不能使用地图,因为要消耗的列表是在同一时间被填满的.因此,我必须进入while循环并尝试检测超时.当生产者仍试图填充它时,consumer_queue可能会变空,所以我不能仅仅检测到一个空队列就退出了.

Here is where it gets cheesy. I can't use map, because the list to consume is being filled up at the same time. So I have to go into a while loop and try to detect a timeout. The consumer_queue can become empty while the producers are still trying to fill it up, so I can't just detect an empty queue an quit on that.

    timed_out = False
    timeout= 1800
    while 1:
        try:
            result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue, ), dict(chunksize=chunksize,))
            if timed_out:
                timed_out = False

        except Queue.Empty:
            if timed_out:
                break

            timed_out = True
            time.sleep(timeout)
        time.sleep(1)

    consumer_queue.join()
    consumer_pool.close()
    consumer_pool.join()

我认为也许我可以get()主线程中的记录,然后将它们传递给使用者,而不是将队列传递进去,但是我认为这样最终会遇到相同的问题.我仍然需要运行while循环并使用apply_async(),谢谢您的任何建议!

I thought that maybe I could get() the records in the main thread and pass those into the consumer instead of passing the queue in, but I think I end up with the same problem that way. I still have to run a while loop and use apply_async() Thank you in advance for any advice!

推荐答案

您可以使用manager.Event表示工作已结束.该事件可以在所有进程之间共享,然后当您从主进程发出信号时,其他工作人员便可以正常关闭了.

You could use a manager.Event to signal the end of the work. This event can be shared between all of your processes and then when you signal it from your main process the other workers can then gracefully shutdown.

while not event.is_set():
 ...rest of code...

因此,您的使用者将等待事件设置,并在事件设置后进行清理.

So, your consumers would wait for the event to be set and handle the cleanup once it is set.

要确定何时设置此标志,可以在生产者线程上执行join,而当所有这些都完成时,则可以在消费者线程上加入.

To determine when to set this flag you can do a join on the producer threads and when those are all complete you can then join on the consumer threads.

这篇关于多重处理-生产者/消费者设计的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆