通讯结束 [英] Communicating end of Queue
问题描述
我正在学习使用Queue模块,对如何使队列使用者线程知道队列已完成感到有些困惑.理想情况下,我想在使用者线程中使用get()
,如果队列已标记为完成",它会抛出异常.与通过附加哨兵值标记队列中的最后一项相比,有更好的通信方式吗?
I'm learning to use the Queue module, and am a bit confused about how a queue consumer thread can be made to know that the queue is complete. Ideally I'd like to use get()
from within the consumer thread and have it throw an exception if the queue has been marked "done". Is there a better way to communicate this than by appending a sentinel value to mark the last item in the queue?
推荐答案
原始(大部分已更改;请参见下面的更新)
基于close
方法的Queue.Queue
的后代.它以原始(未包装)模块的形式提供.我会再清理一点,并在我有更多时间时将其正确包装.目前,该模块仅包含CloseableQueue
类和Closed
异常类.我打算将其扩展为也包含Queue.LifoQueue
和Queue.PriorityQueue
的子类.
original (most of this has changed; see updates below)
Based on some of the suggestions (thanks!) of Glenn Maynard and others, I decided to roll up a descendant of Queue.Queue
that implements a close
method. It's available in the form of a primitive (unpackaged) module. I'll clean this up a bit and package it properly when I have a bit more time. For now the module only contains the CloseableQueue
class and the Closed
exception class. I'm planning to expand it to also include subclasses of Queue.LifoQueue
and Queue.PriorityQueue
.
它目前处于相当初步的状态,也就是说,尽管它通过了测试套件,但实际上我还没有使用它.你的旅费可能会改变.我将通过令人振奋的消息来不断更新此答案.
It's in a pretty preliminary state currently, which is to say that although it passes its test suite, I haven't actually used it for anything yet. Your mileage may vary. I'll keep this answer updated with exciting news.
CloseableQueue
类与Glenn的建议有些不同,因为关闭队列将阻止将来的put
,但不会阻止将来的get
,直到清空队列.这对我来说最有意义.清除队列的功能似乎可以作为单独的mixin *添加,而该mixin *与可关闭性功能正交.因此,基本上使用CloseableQueue
,通过关闭队列,您可以指示最后一个元素是put
.还可以通过将last=True
传递给最终的put
调用来原子地执行此操作.排空队列后,随后对put
的调用以及对get
的后续调用,以及与这些描述匹配的未解决阻塞调用,将引发Closed
异常.
The CloseableQueue
class differs a bit from Glenn's suggestion in that closing the queue will prevent future put
s, but not prevent future get
s until the queue is emptied. This made the most sense to me; it seemed like functionality to clear the queue could be added as a separate mixin* that would be orthogonal to the closeability functionality. So basically with CloseableQueue
, by closing the queue you indicate that the last element has been put
. There's also an option to do this atomically by passing last=True
to the final put
call. Subsequent calls to put
, and subsequent calls to get
once the queue is emptied, as well as outstanding blocked calls matching those descriptions, will raise the Closed
exception.
这在单个生产者为一个或多个消费者生成数据的情况下最有用,但对于消费者正在等待特定项目或一组项目的多重布局也很有用.特别是,它没有提供确定所有生产者都已完成生产的方法.获得这项工作将需要提供某种注册生产者的方式(.open()
?),以及表明生产者注册本身已经关闭的方式.
This is mostly useful for situations where a single producer is generating data for one or more consumers, but it could also be useful for a multi-multi arrangement where consumers are waiting for a particular item or set of items. In particular it doesn't provide a way to determine that all of a number of producers have finished production. Getting that working would entail the provision of some way to register producers (.open()
?), as well as a way to indicate that producer registration is itself closed.
非常欢迎提出建议和/或代码审查.我还没有编写很多并发代码,但是希望测试套件足够彻底,以确保代码通过它的事实表明了代码的质量,而不是套件的缺乏.我能够重用Queue模块测试套件中的一堆代码:文件本身包含在此模块中,并用作各种子类和例程(包括回归测试)的基础.这可能(希望)有助于避免测试部门完全无能为力.代码本身仅用很小的改动就覆盖了Queue.get
和Queue.put
,并添加了close
和closed
方法.
Suggestions and/or code reviews are quite welcome. I haven't written a whole lot of concurrency code, but hopefully the test suite is thorough enough that the fact that the code passes it is an indication of the code's quality, rather than the suite's lack thereof. I was able to reuse a bunch of the code from the Queue module's test suite: the file itself is included in this module and used as a basis for various subclasses and routines, including regression testing. This probably (hopefully) helped to avoid complete ineptitude in the testing department. The code itself just overrides Queue.get
and Queue.put
with fairly minimal changes, and adds the close
and closed
methods.
为了避免使代码与Queue模块本身一样向后兼容,我故意避免在上下文中使用任何新奇的幻想,例如上下文管理器.确实倒退了.我可能会在某个时候添加__enter__
和__exit__
方法;否则,contextlib的关闭函数应适用于CloseableQueue实例.
I've sort of intentionally avoided using any new-fangled fanciness like context managers in both the code itself and in the test suite in an effort to keep the code as backwards-compatible as is the Queue module itself, which is considerably backwards indeed. I'll probably add __enter__
and __exit__
methods at some point; otherwise, the contextlib's closing function should be applicable to a CloseableQueue instance.
*:在这里我宽松地使用术语"mixin".由于Queue
模块的类是老式的,因此需要使用类工厂函数来混合mixins.有一些限制;在Guido禁止的地方提供空位.
*: Here I use the term "mixin" loosely. As the Queue
module's classes are old-style, mixins would need to be mixed using class factory functions; some restrictions apply; offer void where prohibited by Guido.
CloseableQueue 模块现在提供了CloseableLifoQueue
和CloseablePriorityQueue
类.我还添加了一些便利功能来支持迭代.仍然需要将其作为适当的软件包进行返工.有一个类工厂函数,可以方便地将其他Queue.Queue
派生的类进行子类化.
The CloseableQueue module now provides CloseableLifoQueue
and CloseablePriorityQueue
classes. I've also added some convenience functions to support iteration. Still need to rework it as a proper package. There's a class factory function to allow for convenient subclassing of other Queue.Queue
-derived classes.
CloseableQueue
.与
$ easy_install CloseableQueue
欢迎发表评论和批评,特别是来自匿名回答者.
Comments and criticism are welcome, especially from this answer's anonymous downvoter.
这篇关于通讯结束的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!