python multiprocessing .join()死锁取决于辅助函数 [英] python multiprocessing .join() deadlock depends on worker function

查看:191
本文介绍了python multiprocessing .join()死锁取决于辅助函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 multiprocessing python库生成4 Process() 对象可并行执行cpu密集型任务.任务(此出色的

I am using the multiprocessing python library to spawn 4 Process() objects to parallelize a cpu intensive task. The task (inspiration and code from this great article) is to compute the prime factors for every integer in a list.

main.py:

import random
import multiprocessing
import sys

num_inputs  = 4000
num_procs   = 4
proc_inputs = num_inputs/num_procs
input_list  = [int(1000*random.random()) for i in xrange(num_inputs)]

output_queue = multiprocessing.Queue()
procs        = []
for p_i in xrange(num_procs):
  print "Process [%d]"%p_i
  proc_list = input_list[proc_inputs * p_i:proc_inputs * (p_i + 1)]
  print " - num inputs: [%d]"%len(proc_list)

  # Using target=worker1 HANGS on join
  p = multiprocessing.Process(target=worker1, args=(p_i, proc_list, output_queue))
  # Using target=worker2 RETURNS with success
  #p = multiprocessing.Process(target=worker2, args=(p_i, proc_list, output_queue))

  procs.append(p)
  p.start()

for p in jobs:
  print "joining ", p, output_queue.qsize(), output_queue.full()
  p.join()
  print "joined  ", p, output_queue.qsize(), output_queue.full()

print "Processing complete."
ret_vals = []
while output_queue.empty() == False:
    ret_vals.append(output_queue.get())
print len(ret_vals)
print sys.getsizeof(ret_vals)

观察:

  • 如果每个进程的目标是函数worker1,则对于大于4000个元素的输入列表,主线程将卡在.join()上,等待生成的进程终止并且永远不会返回.
  • 如果每个进程的目标是函数worker2,则对于相同的输入列表,代码可以正常工作,并且主线程返回.
  • If the target for each process is the function worker1, for an input list larger than 4000 elements the main thread gets stuck on .join(), waiting for the spawned processes to terminate and never returns.
  • If the target for each process is the function worker2, for the same input list the code works just fine and the main thread returns.

这对我来说很困惑,因为worker1worker2之间的唯一区别(见下文)是前者在Queue中插入了单独的列表,而后者为每个进程插入了一个列表的单个列表

This is very confusing to me, as the only difference between worker1 and worker2 (see below) is that the former inserts individual lists in the Queue whereas the latter inserts a single list of lists for each process.

为什么使用worker1而不使用worker2目标会出现死锁? 两者(或两者都不应该)都超出 Multiprocessing Queue最大大小限制为32767 ?

Why is there deadlock using worker1 and not using worker2 target? Shouldn't both (or neither) go beyond the Multiprocessing Queue maxsize limit is 32767?

worker1 vs worker2:

def worker1(proc_num, proc_list, output_queue):
    '''worker function which deadlocks'''  
    for num in proc_list:
        output_queue.put(factorize_naive(num))

def worker2(proc_num, proc_list, output_queue):
    '''worker function that works'''
    workers_stuff = []

    for num in proc_list:
        workers_stuff.append(factorize_naive(num))
    output_queue.put(workers_stuff)


关于SO的问题有很多,但我认为这些问题的核心显然与所有问题都不同.


There are a lot of similar questions on SO, but I believe the core of this questions is clearly distinct from all of them.

相关链接:

  • https://sopython.com/canon/82/programs-using-multiprocessing-hang-deadlock-and-never-complete/
  • python multiprocessing issues
  • python multiprocessing - process hangs on join for large queue
  • Process.join() and queue don't work with large numbers
  • Python 3 Multiprocessing queue deadlock when calling join before the queue is empty
  • Script using multiprocessing module does not terminate
  • Why does multiprocessing.Process.join() hang?
  • When to call .join() on a process?
  • What exactly is Python multiprocessing Module's .join() Method Doing?

推荐答案

文档警告:

警告:如上所述,如果子进程已将项目放入队列中(并且未使用JoinableQueue.cancel_join_thread),则该进程将不会终止,直到将所有缓冲的项目都刷新到管道为止.

Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

这意味着,如果您尝试加入该进程,则可能会陷入死锁,除非您确定已放入队列中的所有项目都已消耗完.同样,如果子进程是非守护进程的,则父进程在尝试加入其所有非守护进程的子进程时可能会在退出时挂起.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

虽然Queue似乎是不受限制的,但在后台隐藏的情况下,已排队的项目在内存中进行了缓冲,以避免进程间管道超载.在刷新这些内存缓冲区之前,进程无法正常结束.与worker2()相比,您的worker1() 队列中放置了更多的项目,仅此而已.请注意,在实现实现之前,在内存中进行缓冲之前可以排队的项目数量尚未定义:它在OS和Python版本之间可能会有所不同.

While a Queue appears to be unbounded, under the covers queued items are buffered in memory to avoid overloading inter-process pipes. A process cannot end normally before those memory buffers are flushed. Your worker1() puts a lot more items on the queue than your worker2(), and that's all there is to it. Note that the number of items that can queued before the implementation resorts to buffering in memory isn't defined: it can vary across OS and Python release.

正如文档所建议的那样,避免这种情况的正常方法是在尝试.join()个进程之前,先.get()将所有项目 off 排入队列.正如您所发现的那样,是否必要以不确定的方式取决于每个工作进程在队列中放置了多少个项目.

As the docs suggest, the normal way to avoid this is to .get() all the items off the queue before you attempt to .join() the processes. As you've discovered, whether it's necessary to do so depends in an undefined way on how many items have been put on the queue by each worker process.

这篇关于python multiprocessing .join()死锁取决于辅助函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆