Process.join() 和 queue 不适用于大数字 [英] Process.join() and queue don't work with large numbers

查看:38
本文介绍了Process.join() 和 queue 不适用于大数字的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试拆分 for 循环,即

I am trying to split for loop i.e.

N = 1000000
for i in xrange(N):
    #do something

使用 multiprocessing.Process 并且它适用于 N 的小值.当我使用更大的 N 值时会出现问题.在 p.join() 之前或期间发生了一些奇怪的事情,并且程序没有响应.如果我在函数 f 的定义中放置了 print i,而不是 q.put(i),那么一切都运行良好.

using multiprocessing.Process and it works well for small values of N. Problem arise when I use bigger values of N. Something strange happens before or during p.join() and program doesn't respond. If I put print i, instead of q.put(i) in the definition of the function f everything works well.

我将不胜感激.这是代码.

I would appreciate any help. Here is the code.

from multiprocessing import Process, Queue

def f(q,nMin, nMax): # function for multiprocessing
    for i in xrange(nMin,nMax):
        q.put(i)

if __name__ == '__main__':

    nEntries = 1000000

    nCpu = 10
    nEventsPerCpu = nEntries/nCpu
    processes = []

    q = Queue()

    for i in xrange(nCpu):
        processes.append( Process( target=f, args=(q,i*nEventsPerCpu,(i+1)*nEventsPerCpu) ) )

    for p in processes:
        p.start()

    for p in processes:
        p.join()

    print q.qsize()

推荐答案

您正在尝试无限制地增加队列,并且您正在加入一个正在等待队列中的空间的子进程,因此您的主进程停滞了等待那个完成,它永远不会.

You are trying to grow your queue without bounds, and you are joining up to a subprocess that is waiting for room in the queue, so your main process is stalled waiting for that one to complete, and it never will.

如果您在加入之前从队列中取出数据,它将正常工作.

If you pull data out of the queue before the join it will work fine.

您可以使用的一种技术是这样的:

One technique you could use is something like this:

while 1:
    running = any(p.is_alive() for p in processes)
    while not queue.empty():
       process_queue_data()
    if not running:
        break

根据文档,p.is_alive() 应该执行隐式连接,但它似乎也暗示最佳实践可能是在此之后在所有线程上显式执行连接.

According to the documentation, the p.is_alive() should perform an implicit join, but it also appears to imply that the best practice might be to explicitly perform joins on all the threads after this.

虽然这很清楚,但它的性能可能并不那么好.如何让它表现得更好将取决于任务和机器的高度特定(并且一般来说,无论如何,您不应该一次创建那么多进程,除非某些进程会在 I/O 上被阻塞).

Although that is pretty clear, it may not be all that performant. How you make it perform better will be highly task and machine specific (and in general, you shouldn't be creating that many processes at a time, anyway, unless some are going to be blocked on I/O).

除了将进程数量减少到 CPU 数量之外,一些简单的修复可以使其更快(同样,取决于情况)可能如下所示:

Besides reducing the number of processes to the number of CPUs, some easy fixes to make it a bit faster (again, depending on circumstances) might look like this:

liveprocs = list(processes)
while liveprocs:
    try:
        while 1:
            process_queue_data(q.get(False))
    except Queue.Empty:
        pass

    time.sleep(0.5)    # Give tasks a chance to put more data in
    if not q.empty():
        continue
    liveprocs = [p for p in liveprocs if p.is_alive()]

这篇关于Process.join() 和 queue 不适用于大数字的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆