从multiprocessing.Queue获得近LIFO行为的干净方法吗? (或什至只是*不是*近FIFO) [英] Clean way to get near-LIFO behavior from multiprocessing.Queue? (or even just *not* near-FIFO)

查看:141
本文介绍了从multiprocessing.Queue获得近LIFO行为的干净方法吗? (或什至只是*不是*近FIFO)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有人知道从multiprocessing.Queue接近LIFO甚至不接近FIFO(例如随机)行为的干净方法吗?

Does anyone know a clean way to get near-LIFO or even not near-FIFO (e.g. random) behavior from multiprocessing.Queue?

替代问题:有人可以指出我管理multiprocessing.Queue后面实际存储结构的线程的代码吗?在其中提供大约LIFO访问似乎很简单,但是我迷失在兔子洞里试图找到它.

Alternative Question: Could someone point me to the code for the thread that manages the actual storage structure behind multiprocessing.Queue? It seems like it would be trivial within that to provide approximately LIFO access, but I got lost in the rabbit hole trying to find it.

注意:

  1. 我相信multiprocessing.Queue 不能保证订单.美好的.但这是近FIFO,因此近LIFO会很棒.
  2. 我可以将所有当前项目从队列中拉出,并在处理它们之前颠倒顺序,但是我更喜欢避免产生冲突.
  1. I believe multiprocessing.Queue does not guarantee order. Fine. But it is near-FIFO so near-LIFO would be great.
  2. I could pull all the current items off the queue and reverse the order before working with them, but I prefer to avoid a kludge if possible.


(编辑)要澄清:我正在使用multiprocessing进行CPU绑定模拟,因此不能使用Queue中的专用队列.由于几天未见任何答案,因此我在上面添加了替代问题.


(edit) To clarify: I am doing a CPU bound simulation with multiprocessing and so can't use the specialized queues from Queue. Since I haven't seen any answers for a few days, I've added the alternative question above.

如果出现问题,则以下是multiprocessing.Queue接近FIFO的轻微证据.它只是表明,在一个简单的情况下(单线程),它在我的系统上完全是FIFO:

In case it is an issue, below is slight evidence that multiprocessing.Queue is near-FIFO. It just shows that in a simple case (a single thread), it is perfectly FIFO on my system:

import multiprocessing as mp
import Queue

q = mp.Queue()

for i in xrange(1000):
    q.put(i)

deltas = []
while True:
    try:
        value1 = q.get(timeout=0.1)
        value2 = q.get(timeout=0.1)
        deltas.append(value2-value1)
    except Queue.Empty:
        break

#positive deltas would indicate the numbers are coming out in increasing order
min_delta, max_delta = min(deltas), max(deltas)
avg_delta = sum(deltas)/len(deltas)

print "min", min_delta
print "max", max_delta
print "avg", avg_delta

打印:最小值,最大值和平均值恰好是1(完美的FIFO)

prints: min, max, and average are exactly 1 (perfect FIFO)

推荐答案

在我的Python安装程序(Python 2.7)中,我查看了Lib/multiprocessing/queues.py中的Queue类,但是与Python 3.2的版本没有明显不同我简要检查过).我了解它的工作原理:

I've looked over the Queue class that lives in Lib/multiprocessing/queues.py in my Python installation (Python 2.7, but nothing obvious is different in the version from Python 3.2 that I briefly checked). Here's how I understand it works:

队列对象维护两组对象.一组是所有进程都共享的多进程安全基元.其他的则由每个进程分别创建和使用.

There are two sets of objects that are maintained by the Queue object. One set are multiprocess-safe primatives that are shared by all processes. The others are created and used separately by each process.

跨过程对象是在__init__方法中设置的:

The cross-process objects are set up in the __init__ method:

  1. 一个两端都保存为self._readerself._writerPipe对象.
  2. 一个BoundedSemaphore对象,该对象计数(并有选择地限制)队列中有多少个对象.
  3. 一个Lock对象,用于读取Pipe,在非Windows平台上,另一个对象用于写入. (我认为这是因为在Windows上写入管道本质上是多进程安全的.)
  1. A Pipe object, who's two ends are saved as self._reader and self._writer.
  2. A BoundedSemaphore object, which counts (and optionally limits) how many objects are in the queue.
  3. A Lock object for reading the Pipe, and on non-Windows platforms another for writing. (I assume that this is because writing to a pipe is inherently multiprocess-safe on Windows.)

每个进程的对象在_after_fork_start_thread方法中设置:

The per-process objects are set up in the _after_fork and _start_thread methods:

  1. 一个collections.deque对象,用于缓冲对Pipe的写入.
  2. 一个threading.condition对象,用于在缓冲区不为空时发出信号.
  3. 一个threading.Thread对象,可以进行实际的编写.它是延迟创建的,因此在给定进程中至少要求对队列进行一次写入之前,它不存在.
  4. 各种Finalize对象,可在流程结束时清理东西.
  1. A collections.deque object used to buffer writes to the Pipe.
  2. A threading.condition object used to signal when the buffer is not empty.
  3. A threading.Thread object that does the actual writing. It is created lazily, so it won't exist until at least one write to the Queue has been requested in a given process.
  4. Various Finalize objects that clean stuff up when the process ends.

队列中的get非常简单.您获得了读取锁,减少了信号量,并从Pipe的读取端获取了一个对象.

A get from the queue is pretty simple. You acquire the read lock, decrement the semaphore, and grab an object from the read end of the Pipe.

A put更复杂.它使用多个线程. put的调用者获取条件的锁,然后将其对象添加到缓冲区,并在解除条件锁定之前发出信号.它还会增加信号量,并在尚未运行编写器线程时启动它.

A put is more complicated. It uses multiple threads. The caller to put grabs the condition's lock, then adds its object to the buffer and signals the condition before unlocking it. It also increments the semaphore and starts up the writer thread if it isn't running yet.

writer线程在_feed方法中永远循环(直到取消).如果缓冲区为空,则等待notempty条件.然后,它从缓冲区中获取一个项目,获取写锁(如果存在),然后将该项目写入Pipe.

The writer thread loops forever (until canceled) in the _feed method. If the buffer is empty, it waits on the notempty condition. Then it takes an item from the buffer, acquires the write lock (if it exists) and writes the item to the Pipe.

因此,鉴于所有这些,您可以对其进行修改以获取LIFO队列吗?这似乎并不容易.管道本质上是FIFO对象,尽管Queue不能保证FIFO的整体行为(由于来自多个进程的写入具有异步特性),但它总是主要是FIFO.

So, given all of that, can you modify it to get a LIFO queue? It doesn't seem easy. Pipes are inherently FIFO objects, and while the Queue can't guarantee FIFO behavior overall (due to the asynchronous nature of the writes from multiple processes) it is always going to be mostly FIFO.

如果只有一个使用者,则可以从队列中get对象,并将它们添加到自己的本地进程堆栈中.尽管使用共享内存,有界大小的堆栈也不会太难,但要做一个多用户堆栈会比较困难.您需要一个锁,一对条件(用于在完全状态和空状态下进行阻塞/信号发送)​​,一个共享的整数值(用于保存的值的数量)和一个适当类型的共享数组(用于值本身).

If you have only a single consumer, you could get objects from the queue and add them to your own process-local stack. It would be harder to do a multi-consumer stack, though with shared memory a bounded-size stack wouldn't be too hard. You'd need a lock, a pair of conditions (for blocking/signaling on full and empty states), a shared integer value (for the number of values held) and a shared array of an appropriate type (for the values themselves).

这篇关于从multiprocessing.Queue获得近LIFO行为的干净方法吗? (或什至只是*不是*近FIFO)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆