将multiprocessing.Queue转储到列表中 [英] Dumping a multiprocessing.Queue into a list
问题描述
我希望将multiprocessing.Queue
转储到列表中.为此,我编写了以下函数:
I wish to dump a multiprocessing.Queue
into a list. For that task I've written the following function:
import Queue
def dump_queue(queue):
"""
Empties all pending items in a queue and returns them in a list.
"""
result = []
# START DEBUG CODE
initial_size = queue.qsize()
print("Queue has %s items initially." % initial_size)
# END DEBUG CODE
while True:
try:
thing = queue.get(block=False)
result.append(thing)
except Queue.Empty:
# START DEBUG CODE
current_size = queue.qsize()
total_size = current_size + len(result)
print("Dumping complete:")
if current_size == initial_size:
print("No items were added to the queue.")
else:
print("%s items were added to the queue." % \
(total_size - initial_size))
print("Extracted %s items from the queue, queue has %s items \
left" % (len(result), current_size))
# END DEBUG CODE
return result
但是由于某些原因,它不起作用.
But for some reason it doesn't work.
观察以下shell会话:
Observe the following shell session:
>>> import multiprocessing
>>> q = multiprocessing.Queue()
>>> for i in range(100):
... q.put([range(200) for j in range(100)])
...
>>> q.qsize()
100
>>> l=dump_queue(q)
Queue has 100 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 99 items left
>>> l=dump_queue(q)
Queue has 99 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 3 items from the queue, queue has 96 items left
>>> l=dump_queue(q)
Queue has 96 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 95 items left
>>>
这是怎么回事?为什么不是所有物品都被丢弃?
What's happening here? Why aren't all the items being dumped?
推荐答案
尝试一下:
import Queue
import time
def dump_queue(queue):
"""
Empties all pending items in a queue and returns them in a list.
"""
result = []
for i in iter(queue.get, 'STOP'):
result.append(i)
time.sleep(.1)
return result
import multiprocessing
q = multiprocessing.Queue()
for i in range(100):
q.put([range(200) for j in range(100)])
q.put('STOP')
l=dump_queue(q)
print len(l)
多处理队列具有一个内部缓冲区,该缓冲区具有供稿器线程,该线程可以将工作从缓冲区中拉出并将其刷新到管道.如果不是所有的对象都被刷新,我可能会看到一个过早引发Empty的情况.使用哨兵指示队列的结束是安全的(并且是可靠的).另外,使用iter(get,sentinel)习惯比依赖Empty更好.
Multiprocessing queues have an internal buffer which has a feeder thread which pulls work off a buffer and flushes it to the pipe. If not all of the objects have been flushed, I could see a case where Empty is raised prematurely. Using a sentinel to indicate the end of the queue is safe (and reliable). Also, using the iter(get, sentinel) idiom is just better than relying on Empty.
我不喜欢由于刷新时间而将其清空(我添加了time.sleep(.1)以便允许上下文切换到供稿器线程,您可能不需要它,没有它就可以工作-它是释放GIL的习惯.
I don't like that it could raise empty due to flushing timing (I added the time.sleep(.1) to allow a context switch to the feeder thread, you may not need it, it works without it - it's a habit to release the GIL).
这篇关于将multiprocessing.Queue转储到列表中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!