在Python 3的多处理队列中避免竞争条件 [英] Avoiding race conditions in Python 3's multiprocessing Queues

查看:142
本文介绍了在Python 3的多处理队列中避免竞争条件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试找到大约61亿个(自定义)商品的最大重量,我想通过并行处理来做到这一点.对于我的特定应用程序,有更好的算法不需要我对61亿个项目进行迭代,但是解释它们的教科书让我头疼不已,而我的老板希望在4天之内完成这项工作.我认为我公司的高级服务器和并行处理的性能更好.但是,我对并行处理所了解的一切都来自阅读 Python 文档.就是说我很迷路...

I'm trying to find the maximum weight of about 6.1 billion (custom) items and I would like to do this with parallel processing. For my particular application there are better algorithms that don't require my iterating over 6.1 billion items, but the textbook that explains them is over my head and my boss wants this done in 4 days. I figured I have a better shot with my company's fancy server and parallel processing. However, everything I know about parallel processing comes from reading the Python documentation. Which is to say I'm pretty lost...

我目前的理论是建立一个馈送进程,一个输入队列,一整串(例如30个)工作进程以及一个输出队列(在输出队列中找到最大的元素将是微不足道的).我不明白的是,供料器流程如何告诉工作人员流程何时停止等待商品通过输入队列.

My current theory is to set up a feeder process, an input queue, a whole bunch (say, 30) of worker processes, and an output queue (finding the maximum element in the output queue will be trivial). What I don't understand is how the feeder process can tell the worker processes when to stop waiting for items to come through the input queue.

我曾考虑过在6.1E9项的可迭代项上使用multiprocessing.Pool.map_async,但是要遍历这些项而不对其执行任何操作将花费近10分钟的时间. 除非我误会了... ,否则在进程开始工作时,可以通过map_async遍历它们将它们分配给进程. (Pool还提供了imap,但文档表示它与map,它似乎不能异步工作.我要异步,对吗?)

I had thought about using multiprocessing.Pool.map_async on my iterable of 6.1E9 items, but it takes nearly 10 minutes just to iterate through the items without doing anything to them. Unless I'm misunderstanding something..., having map_async iterate through them to assign them to processes could be done while the processes begin their work. (Pool also provides imap but the documentation says it's similar to map, which doesn't appear to work asynchronously. I want asynchronous, right?)

相关问题:我要使用concurrent.futures而不是multiprocessing吗?我不能成为第一个实施两队列系统的人(这正是美国每个熟食店的生产线运作的方式...),那么有没有更多的Pythonic/内置方法来实现此目的?

Related questions: Do I want to use concurrent.futures instead of multiprocessing? I couldn't be the first person to implement a two-queue system (that's exactly how the lines at every deli in America work...) so is there a more Pythonic/built-in way to do this?

这是我正在尝试做的事情的骨架. 请参阅中间的评论栏.

Here's a skeleton of what I'm trying to do. See the comment block in the middle.

import multiprocessing as mp
import queue

def faucet(items, bathtub):
    """Fill bathtub, a process-safe queue, with 6.1e9 items"""
    for item in items:
        bathtub.put(item)
    bathtub.close()

def drain_filter(bathtub, drain):
    """Put maximal item from bathtub into drain.
    Bathtub and drain are process-safe queues.
    """
    max_weight = 0
    max_item = None
    while True:
        try:
            current_item = bathtub.get()
        # The following line three lines are the ones that I can't
        # quite figure out how to trigger without a race condition.
        # What I would love is to trigger them AFTER faucet calls
        # bathtub.close and the bathtub queue is empty.
        except queue.Empty:
            drain.put((max_weight, max_item))
            return
        else:
            bathtub.task_done()
        if not item.is_relevant():
            continue
        current_weight = item.weight
        if current_weight > max_weight:
            max_weight = current_weight
            max_item = current_item

def parallel_max(items, nprocs=30):
    """The elements of items should have a method `is_relevant`
    and an attribute `weight`. `items` itself is an immutable
    iterator object.
    """
    bathtub_q = mp.JoinableQueue()
    drain_q = mp.Queue()

    faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q))
    worker_procs = mp.Pool(processes=nprocs)

    faucet_proc.start()
    worker_procs.apply_async(drain_filter, bathtub_q, drain_q)

    finalists = []
    for i in range(nprocs):
        finalists.append(drain_q.get())

    return max(finalists)


我从我的问题中找到了非常彻底的答案,并且从Python Foundation传播总监Doug Hellman那里对多任务进行了简短的介绍.我想要的是毒丸"模式.在此处进行检查: http://www.doughellmann.com/PyMOTW/multiprocessing/communication. html

I found a very thorough answer to my question, and a gentle introduction to multitasking from Python Foundation communications director Doug Hellman. What I wanted was the "poison pill" pattern. Check it out here: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html

支持@MRAB发布该概念的内核.

Props to @MRAB for posting the kernel of that concept.

推荐答案

您可以将特殊的终止项目(如None)放入队列中.当一个工人看到它时,可以将其放回给其他工人查看,然后终止.或者,您可以将每个工作人员的一个特殊终止项目放入队列.

You could put a special terminating item, such as None, into the queue. When a worker sees it, it can put it back for the other workers to see, and then terminate. Alternatively, you could put one special terminating item per worker into the queue.

这篇关于在Python 3的多处理队列中避免竞争条件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆