multiprocessing.Queue 中的大对象死锁 [英] Deadlock with big object in multiprocessing.Queue

查看:155
本文介绍了multiprocessing.Queue 中的大对象死锁的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当您向 multiprocessing.Queue 提供足够大的对象时,程序似乎挂在奇怪的地方.考虑这个最小的例子:

When you supply a large-enough object into multiprocessing.Queue, the program seems to hang at weird places. Consider this minimal example:

import multiprocessing

def dump_dict(queue, size):
  queue.put({x: x for x in range(size)})
  print("Dump finished")

if __name__ == '__main__':
  SIZE = int(1e5)
  queue = multiprocessing.Queue()
  process = multiprocessing.Process(target=dump_dict, args=(queue, SIZE))
  print("Starting...")
  process.start()
  print("Joining...")
  process.join()
  print("Done")
  print(len(queue.get()))

如果 SIZE 参数足够小(<= 1e4 至少在我的情况下),整个程序运行顺利没有问题,但是一旦 SIZE足够大,程序挂在奇怪的地方.现在,在搜索解释时,即 python multiprocessing - process挂在加入大队列,我总是看到你需要从队列中消费"的一般答案.但看起来很奇怪的是,程序实际上打印了Dump finished,即在将对象放入queue 后到达代码行.此外,使用 Queue.put_nowait 而不是 Queue.put 也没有什么区别.

If the SIZE parameter is small-enough (<= 1e4 at least in my case), the whole program runs smoothly without a problem, but once the SIZE is big-enough, the program hangs at weird places. Now, when searching for explanation, i.e. python multiprocessing - process hangs on join for large queue, I have always seen general answers of "you need to consume from the queue". But what seems weird is that the program actually prints Dump finished i.e. reaching the code line after putting the object into the queue. Furthermore using Queue.put_nowait instead of Queue.put did not make a difference.

最后,如果你使用 Process.join(1) 而不是 Process.join() 整个过程完成队列中有完整的字典 (即 print(len(..)) 行将打印 10000).

Finally if you use Process.join(1) instead of Process.join() the whole process finishes with complete dictionary in the queue (i.e. the print(len(..)) line will print 10000).

有人可以让我对此有更深入的了解吗?

Can somebody give me a little bit more insight into this?

推荐答案

你需要在 process.join() 之前在父级queue.get()防止死锁.队列已经产生了一个带有第一个 queue.put() 的馈线线程,并且您的工作进程中的 MainThread 在退出之前加入了这个馈线线程.因此,在结果完全刷新到(OS-pipe-)缓冲区之前,工作进程不会退出,但是您的结果太大而无法放入缓冲区,并且您的父进程在工作人员退出之前不会从队列中读取,导致死锁.

You need to queue.get() in the parent before you process.join() to prevent a deadlock. The queue has spawned a feeder-thread with its first queue.put() and the MainThread in your worker-process is joining this feeder-thread before exiting. So the worker-process won't exit before the result is flushed to (OS-pipe-)buffer completely, but your result is too big to fit into the buffer and your parent doesn't read from the queue until the worker has exited, resulting in a deadlock.

你会看到 print("Dump finished") 的输出,因为实际的发送发生在 feeder-thread,queue.put() 本身只是附加到一个collections.deque 作为中间步骤.

You see the output of print("Dump finished") because the actual sending happens from the feeder-thread, queue.put() itself just appends to a collections.deque within the worker-process as an intermediate step.

这篇关于multiprocessing.Queue 中的大对象死锁的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆