可迭代的多处理队列未退出 [英] Iterable multiprocessing Queue not exiting
问题描述
import multiprocessing.queues as queues
import multiprocessing
class I(queues.Queue):
def __init__(self, maxsize=0):
super(I, self).__init__(maxsize)
self.length = 0
def __iter__(self):
return self
def put(self, obj, block=True, timeout=None):
super(I, self).put(obj,block,timeout)
self.length += 1
def get(self, block = True, timeout = None):
self.length -= 1
return super(I, self).get(block, timeout)
def __len__(self):
return self.length
def next(self):
item = self.get()
if item == 'Done':
raise StopIteration
return item
def thisworker(item):
print 'got this item: %s' % item
return item
q=I()
q.put(1)
q.put('Done')
the_pool = multiprocessing.Pool(1)
print the_pool.map(thisworker, q)
我正在尝试创建一个可迭代的队列以与多处理池映射一起使用.
这个想法是,函数thisworker
会将一些项目追加到队列中,直到满足条件为止,然后在将"Done"放入队列后退出(在此代码中我还未在此完成)
I'm trying to create an iterable queue to use with multiprocessing pool map.
The idea is that the function thisworker
would append some items to the queue until a condition is met and then exit after putting 'Done' in the queue (I've not done it here in this code yet)
但是,此代码永远不会完成,总是会挂断.
But, this code never completes, it always hangs up.
我无法调试真正的原因. 请求您的帮助
I'm not able to debug the real cause. Request your help
PS:我使用了self.length
,因为在the_pool.map
下调用的map_async
方法需要使用可迭代对象的长度来形成变量:chunksize
,该变量将用于从以下位置获取任务:池.
PS: I've used self.length
because the map_async
method called from under the_pool.map
requires to use the length of the iterable to form a variable: chunksize
, which will be used to get tasks from the pool.
推荐答案
问题是您将'Done'
视为Queue
中的特殊情况,这表明迭代应停止.因此,如果在示例中使用for循环对Queue
进行迭代,则返回的全部为1
.但是,您声称Queue
的长度为2.这弄乱了map
代码,该代码依靠该长度来准确表示可迭代项的数量,以便知道何时所有工人们已返回结果:
The problem is that you're treating 'Done'
as a special-case item in the Queue
, which indicates that the iteration should stop. So, if you iterate over the Queue
using a for loop with your example, all that will be returned is 1
. However, you're claiming that the length of the Queue
is 2. This is screwing up the map
code, which is relying on that length to accurately represent the number of items in the iterable in order to know when all the results have returned from the workers:
class MapResult(ApplyResult):
def __init__(self, cache, chunksize, length, callback):
ApplyResult.__init__(self, cache, callback)
...
# _number_left is used to know when the MapResult is done
self._number_left = length//chunksize + bool(length % chunksize)
因此,您需要使长度实际准确.您可以通过几种方法来做到这一点,但我建议您完全不需要将哨兵加载到Queue
中,而应使用get_nowait
:
So, you need to make the length actually be accurate. You can do that a few ways, but I would recommend not requiring a sentinel to be loaded into the Queue
at all, and use get_nowait
instead:
import multiprocessing.queues as queues
import multiprocessing
from Queue import Empty
class I(queues.Queue):
def __init__(self, maxsize=0):
super(I, self).__init__(maxsize)
self.length = 0
... <snip>
def next(self):
try:
item = self.get_nowait()
except Empty:
raise StopIteration
return item
def thisworker(item):
print 'got this item: %s' % item
return item
q=I()
q.put(1)
the_pool = multiprocessing.Pool(1)
print the_pool.map(thisworker, q)
此外,请注意,这种方法并不安全.仅当您从单个进程仅将put
放入Queue
,并且在将Queue
发送到辅助进程后再从不put
时,length
属性才是正确的.如果不调整导入和实现,它也将无法在Python 3中工作,因为multiprocessing.queues.Queue
的构造函数已更改.
Also, note that this approach isn't process safe. The length
attribute will only be correct if you only put
into the Queue
from a single process, and then never put
again after sending the Queue
to a worker process. It also won't work in Python 3 without adjusting the imports and implementation, because the constructor for multiprocessing.queues.Queue
has changed.
代替子类multiprocessing.queues.Queue
,我建议使用内置的iter
遍历Queue
:
Instead of subclassing multiprocessing.queues.Queue
, I would recommend using the iter
built-in to iterate over the Queue
:
q = multiprocessing.Queue()
q.put(1)
q.put(2)
q.put(None) # None is our sentinel, you could use 'Done', if you wanted
the_pool.map(thisworker, iter(q.get, None)) # This will call q.get() until None is returned
这将适用于所有版本的Python,代码更少,并且过程安全.
This will work on all versions of Python, is much less code, and is process-safe.
根据您对我的答案的评论中提到的要求,我认为您最好使用imap
而不是map
,这样您就不必知道Queue
的长度了全部.现实情况是,您无法准确确定该长度,实际上,长度可能最终会随着您的迭代而增长.如果仅使用imap
,则可以执行与原始方法类似的操作:
Based on the requirements you mentioned in the comment to my answer, I think you're better off using imap
instead of map
, so that you don't need to know the length of the Queue
at all. The reality is, you can't accurately determine that, and in fact the length may end up growing as you're iterating. If you use imap
exclusively, then doing something similar to your original approach will work fine:
import multiprocessing
class I(object):
def __init__(self, maxsize=0):
self.q = multiprocessing.Queue(maxsize)
def __getattr__(self, attr):
if hasattr(self.q, attr):
return getattr(self.q, attr)
def __iter__(self):
return self
def next(self):
item = self.q.get()
if item == 'Done':
raise StopIteration
return item
def thisworker(item):
if item == 1:
q.put(3)
if item == 2:
q.put('Done')
print 'got this item: %s' % item
return item
q=I()
q.put(1)
q.put(2)
q.put(5)
the_pool = multiprocessing.Pool(2) # 2 workers
print list(the_pool.imap(thisworker, q))
输出:
got this item: 1
got this item: 5
got this item: 3
got this item: 2
[1, 2, 5, 3]
我摆脱了担心长度的代码,并使用委托而不是继承来获得更好的Python 3.x兼容性.
I got rid of the code that worried about the length, and used delegation instead of inheritance, for better Python 3.x compatibility.
请注意,只要您使用imap
而不是map
,我最初的建议(使用iter(q.get, <sentinel>)
)在这里仍然适用.
Note that my original suggestion, to use iter(q.get, <sentinel>)
, still works here, too, as long as you use imap
instead of map
.
这篇关于可迭代的多处理队列未退出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!