可迭代的多处理队列未退出 [英] Iterable multiprocessing Queue not exiting

查看:71
本文介绍了可迭代的多处理队列未退出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

import multiprocessing.queues as queues
import multiprocessing
class I(queues.Queue):
    def __init__(self, maxsize=0):
        super(I, self).__init__(maxsize)
        self.length = 0 

    def __iter__(self):
        return self

    def put(self, obj, block=True, timeout=None):
        super(I, self).put(obj,block,timeout)
        self.length += 1

    def get(self, block = True, timeout = None):
        self.length -= 1
        return super(I, self).get(block, timeout)

    def __len__(self):
        return self.length

    def next(self):
        item = self.get()
        if item == 'Done':
            raise StopIteration
        return item


def thisworker(item):
    print 'got this item: %s' % item
    return item

q=I()

q.put(1)
q.put('Done')

the_pool = multiprocessing.Pool(1)
print the_pool.map(thisworker, q)

我正在尝试创建一个可迭代的队列以与多处理池映射一起使用. 这个想法是,函数thisworker会将一些项目追加到队列中,直到满足条件为止,然后在将"Done"放入队列后退出(在此代码中我还未在此完成)

I'm trying to create an iterable queue to use with multiprocessing pool map. The idea is that the function thisworker would append some items to the queue until a condition is met and then exit after putting 'Done' in the queue (I've not done it here in this code yet)

但是,此代码永远不会完成,总是会挂断.

But, this code never completes, it always hangs up.

我无法调试真正的原因. 请求您的帮助

I'm not able to debug the real cause. Request your help

PS:我使用了self.length,因为在the_pool.map下调用的map_async方法需要使用可迭代对象的长度来形成变量:chunksize,该变量将用于从以下位置获取任务:池.

PS: I've used self.length because the map_async method called from under the_pool.map requires to use the length of the iterable to form a variable: chunksize, which will be used to get tasks from the pool.

推荐答案

问题是您将'Done'视为Queue中的特殊情况,这表明迭代应停止.因此,如果在示例中使用for循环对Queue进行迭代,则返回的全部为1.但是,您声称Queue的长度为2.这弄乱了map代码,该代码依靠该长度来准确表示可迭代项的数量,以便知道何时所有工人们已返回结果:

The problem is that you're treating 'Done' as a special-case item in the Queue, which indicates that the iteration should stop. So, if you iterate over the Queue using a for loop with your example, all that will be returned is 1. However, you're claiming that the length of the Queue is 2. This is screwing up the map code, which is relying on that length to accurately represent the number of items in the iterable in order to know when all the results have returned from the workers:

class MapResult(ApplyResult):

    def __init__(self, cache, chunksize, length, callback):
        ApplyResult.__init__(self, cache, callback)
        ...
        # _number_left is used to know when the MapResult is done
        self._number_left = length//chunksize + bool(length % chunksize)

因此,您需要使长度实际准确.您可以通过几种方法来做到这一点,但我建议您完全不需要将哨兵加载到Queue中,而应使用get_nowait:

So, you need to make the length actually be accurate. You can do that a few ways, but I would recommend not requiring a sentinel to be loaded into the Queue at all, and use get_nowait instead:

import multiprocessing.queues as queues
import multiprocessing
from Queue import Empty

class I(queues.Queue):
    def __init__(self, maxsize=0):
        super(I, self).__init__(maxsize)
        self.length = 0 

    ... <snip>

    def next(self):
        try:
            item = self.get_nowait()
        except Empty:
            raise StopIteration
        return item


def thisworker(item):
    print 'got this item: %s' % item
    return item

q=I()

q.put(1)

the_pool = multiprocessing.Pool(1)
print the_pool.map(thisworker, q)

此外,请注意,这种方法并不安全.仅当您从单个进程仅将put放入Queue,并且在将Queue发送到辅助进程后再从不put时,length属性才是正确的.如果不调整导入和实现,它也将无法在Python 3中工作,因为multiprocessing.queues.Queue的构造函数已更改.

Also, note that this approach isn't process safe. The length attribute will only be correct if you only put into the Queue from a single process, and then never put again after sending the Queue to a worker process. It also won't work in Python 3 without adjusting the imports and implementation, because the constructor for multiprocessing.queues.Queue has changed.

代替子类multiprocessing.queues.Queue,我建议使用内置的iter遍历Queue:

Instead of subclassing multiprocessing.queues.Queue, I would recommend using the iter built-in to iterate over the Queue:

q = multiprocessing.Queue()
q.put(1)
q.put(2)
q.put(None)  # None is our sentinel, you could use 'Done', if you wanted
the_pool.map(thisworker, iter(q.get, None)) # This will call q.get() until None is returned

这将适用于所有版本的Python,代码更少,并且过程安全.

This will work on all versions of Python, is much less code, and is process-safe.

根据您对我的答案的评论中提到的要求,我认为您最好使用imap而不是map,这样您就不必知道Queue的长度了全部.现实情况是,您无法准确确定该长度,实际上,长度可能最终会随着您的迭代而增长.如果仅使用imap,则可以执行与原始方法类似的操作:

Based on the requirements you mentioned in the comment to my answer, I think you're better off using imap instead of map, so that you don't need to know the length of the Queue at all. The reality is, you can't accurately determine that, and in fact the length may end up growing as you're iterating. If you use imap exclusively, then doing something similar to your original approach will work fine:

import multiprocessing

class I(object):
    def __init__(self, maxsize=0):
        self.q = multiprocessing.Queue(maxsize)

    def __getattr__(self, attr):
        if hasattr(self.q, attr):
            return getattr(self.q, attr)

    def __iter__(self):
        return self

    def next(self):
        item = self.q.get()
        if item == 'Done':
            raise StopIteration
        return item


def thisworker(item):
    if item == 1:
        q.put(3)
    if item == 2:
        q.put('Done')
    print 'got this item: %s' % item
    return item

q=I()

q.put(1)
q.put(2)
q.put(5)

the_pool = multiprocessing.Pool(2)  # 2 workers
print list(the_pool.imap(thisworker, q))

输出:

got this item: 1
got this item: 5
got this item: 3
got this item: 2
[1, 2, 5, 3]

我摆脱了担心长度的代码,并使用委托而不是继承来获得更好的Python 3.x兼容性.

I got rid of the code that worried about the length, and used delegation instead of inheritance, for better Python 3.x compatibility.

请注意,只要您使用imap而不是map,我最初的建议(使用iter(q.get, <sentinel>))在这里仍然适用.

Note that my original suggestion, to use iter(q.get, <sentinel>), still works here, too, as long as you use imap instead of map.

这篇关于可迭代的多处理队列未退出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆