如何使用multiprocessing.Queue.get方法? [英] How to use multiprocessing.Queue.get method?

查看:391
本文介绍了如何使用multiprocessing.Queue.get方法?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面的代码将三个数字放在一个队列中.然后,它尝试从队列中取回数字.但是它永远不会.如何从队列中获取数据?

The code below places three numbers in a queue. Then it attempts to get the numbers back from the queue. But it never does. How to get the data from the queue?

import multiprocessing

queue = multiprocessing.Queue()

for i in range(3):
    queue.put(i)

while not queue.empty():
    print queue.get()

推荐答案

在阅读@Martijn Pieters'之后,我最初删除了此答案,因为他更详细地描述了为什么不起作用".然后 我意识到,OP的示例中的用例与

I originally deleted this answer after I read @Martijn Pieters', since he decribed the "why this doesn't work" in more detail and earlier. Then I realized, that the use case in OP's example doesn't quite fit to the canonical sounding title of

如何使用multiprocessing.Queue.get方法".

"How to use multiprocessing.Queue.get method".

那不是因为有 没有子进程用于演示,但是因为在实际应用程序中几乎没有预先填充队列,而是在读取之后才读取,但是读取 并在等待时间之间交错地进行写作. Martijn展示的扩展演示代码在通常情况下不起作用,因为当入队不符合阅读要求时,while循环可能会中断得太早.因此,这里是重新加载的答案,它可以处理通常的交错供稿&读取方案:

That's not because there's no child process involved for demonstration, but because in real applications hardly ever a queue is pre-filled and only read out after, but reading and writing happens interleaved with waiting times in between. The extended demonstration code Martijn showed, wouldn't work in the usual scenarios, because the while loop would break too soon when enqueuing doesn't keep up with reading. So here is the answer reloaded, which is able to deal with the usual interleaved feeds & reads scenarios:

不要依赖queue.empty检查同步.

Don't rely on queue.empty checks for synchronization.

将对象放入空队列后,在队列的empty()方法返回False且get_nowait()可以不增加queue.Empty的情况下返回之前,可能会有无穷的延迟. ...

After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising queue.Empty. ...

空()

如果队列为空,则返回True,否则返回False.由于多线程/多处理语义,这是不可靠的. 文档

Return True if the queue is empty, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable. docs

从队列中使用for msg in iter(queue.get, sentinel):.get(),您都可以通过传递哨兵值来打破循环... iter(可呼叫,前哨)?

Either use for msg in iter(queue.get, sentinel): to .get() from the queue, where you break out of the loop by passing a sentinel value...iter(callable, sentinel)?

from multiprocessing import Queue

SENTINEL = None

if __name__ == '__main__':

    queue = Queue()

    for i in [*range(3), SENTINEL]:
        queue.put(i)

    for msg in iter(queue.get, SENTINEL):
        print(msg)

...或者如果需要非阻塞解决方案,请使用get_nowait()并处理可能的queue.Empty异常.

...or use get_nowait() and handle a possible queue.Empty exception if you need a non-blocking solution.

from multiprocessing import Queue
from queue import Empty
import time

SENTINEL = None

if __name__ == '__main__':

    queue = Queue()

    for i in [*range(3), SENTINEL]:
        queue.put(i)

    while True:
        try:
            msg = queue.get_nowait()
            if msg == SENTINEL:
                break
            print(msg)
        except Empty:
            # do other stuff
            time.sleep(0.1)

如果只有一个进程且该进程中只有一个线程正在读取队列,则还可以与以下命令交换最后一个代码段:

In case only one process and only one thread within this process is reading the queue, it would be also possible to exchange the last code snippet with:

while True:
    if not queue.empty():  # this is not an atomic operation ...
        msg = queue.get()  # ... thread could be interrupted in between
        if msg == SENTINEL:
            break
        print(msg)
    else:
        # do other stuff
        time.sleep(0.1)

因为线程可以在检查if not queue.empty()queue.get()之间删除 GIL ,这不适用于进程中的多线程队列读取.如果从队列中读取多个进程,则同样适用.

Since a thread could drop the GIL in between checking if not queue.empty() and queue.get(), this wouldn't be suitable for multi-threaded queue-reads in a process. The same applies if multiple processes are reading from the queue.

对于单一生产者/单一消费者的情况,使用multiprocessing.Pipe代替multiprocessing.Queue就足够了,并且性能更高.

For single-producer / single-consumer scenarios, using a multiprocessing.Pipe instead of multiprocessing.Queue would be sufficient and more performant, though.

这篇关于如何使用multiprocessing.Queue.get方法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆