多重处理的结果是截止到992个整数 [英] Multiprocessing has cutoff at 992 integers being joined as result

查看:111
本文介绍了多重处理的结果是截止到992个整数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在关注这本书 http://doughellmann.com/pages /python-standard-library-by-example.html

以及一些在线参考资料.我有一些用于多处理的算法设置,其中我有大量的字典并进行一些计算.我使用多重处理来划分在字典上进行计算的索引.为了使问题更笼统,我仅用一些返回值数组替换了算法.通过在线查找信息以及其他方式,我认为这与join方法有关.

Along with some online references. I have some algorithm setup for multiprocessing where i have a large array of dictionaries and do some calculation. I use multiprocessing to divide the indexes on which the calculations are done on the dictionary. To make the question more general, I replaced the algorithm with just some array of return values. From finding information online and other SO, I think it has to do with the join method.

结构就像这样

生成一些伪造的数据,调用管理器函数进行多处理,创建一个队列,将数据除以索引数.循环使用的进程数,向每个进程函数发送正确的索引范围.最后加入流程并打印出结果.

Generate some fake data, call the manager function for multiprocessing, create a Queue, divide data over the number of index. Loop through the number of processes to use, send each process function the correct index range. Lastly join the processes and print out the results.

我弄清楚的是,如果进程使用的函数试图返回范围(0,992),它会快速工作;如果范围(0,993),则它会挂起.我在具有不同规格的两台不同计算机上进行了尝试.

What I have figured out, is if the function used by the processes is trying to return a range(0,992), it works quickly, if the range(0,993), it hangs. I tried on two different computers with different specs.

代码在这里:

import multiprocessing

def main():
    data = []
    for i in range(0,10):
        data.append(i)

    CalcManager(data,start=0,end=50)

def CalcManager(myData,start,end):
    print 'in calc manager'
    #Multi processing
    #Set the number of processes to use.  
    nprocs = 3
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    result_q = multiprocessing.Queue()
    #Setup an empty array to store our processes
    procs = []
    #Divide up the data for the set number of processes 
    interval = (end-start)/nprocs 
    new_start = start
    #Create all the processes while dividing the work appropriately
    for i in range(nprocs):
        print 'starting processes'
        new_end = new_start + interval
        #Make sure we dont go past the size of the data 
        if new_end > end:
            new_end = end 
        #Generate a new process and pass it the arguments 
        data = myData[new_start:new_end]
        #Create the processes and pass the data and the result queue
        p = multiprocessing.Process(target=multiProcess,args=(data,new_start,new_end,result_q,i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'    
    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

    #Print out the results
    for i in range(nprocs):
        result = result_q.get()
        print result

#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
    print 'started process'
    results = range(0,(992))
    result_q.put(results)
    return

if __name__== '__main__':   
    main()

这些数字是否具体存在?或者我只是缺少一些与这些数字无关的基本信息?

Is there something about these numbers specifically or am I just missing something basic that has nothing to do with these numbers?

从我的搜索看来,join方法似乎存在一些内存问题,但该书并未真正说明如何使用此设置解决该问题.是否可以使用此结构(我最了解它,所以如果我可以继续使用它,那就太好了),并且还可以传回较大的结果.我知道还有其他方法可以在进程之间共享数据,但这不是我所需要的,只需返回值,完成后就将它们连接到一个数组中即可.

From my searches, it seems this is some memory issue with the join method, but the book does not really explain how to solve this using this setup. Is it possible to use this structure (i understand it mostly, so it would be nice if i can continue to use this) and also pass back large results. I know there are other methods to share data between processes, but thats not what I need, just return the values and join them to one array once completed.

推荐答案

我无法在我的机器上重现此问题,但是听起来put中进入队列的项目尚未刷新到底层管道.根据文档:

I can't reproduce this on my machine, but it sounds like items in put into the queue haven't been flushed to the underlying pipe. This will cause a deadlock if you try to terminate the process, according to the docs:

如上所述,如果子进程已将项目放入队列中(并且 它没有使用JoinableQueue.cancel_join_thread),那么该过程 直到所有缓冲的项目都已刷新到 管道.这意味着,如果您尝试加入该流程,则可能会 除非您确定已将所有物品放在 队列已被消耗.同样,如果子进程是 非守护进程,则父进程在尝试执行以下操作时可能会在退出时挂起 加入其所有非守护进程的孩子.

As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

如果您处于这种情况.您的p.join()调用将永远挂起,因为队列中仍然有缓冲的数据.您可以通过加入队列中的 之前的队列来避免这种情况:

If you're in this situation. your p.join() calls will hang forever, because there's still buffered data in the queue. You can avoid it by consuming from the queue before you join the processes:

#Print out the results
for i in range(nprocs):
    result = result_q.get()
    print result

#Joint the process to wait for all data/process to be finished
for p in procs:
    p.join()

这不会影响代码的工作方式,每次result_q.get()调用都会阻塞,直到结果放入队列为止,这具有与在调用get之前在所有进程上调用join相同的效果.唯一的区别是您避免了死锁.

This doesn't affect the way the code works, each result_q.get() call will block until the result is placed on the queue, which has the same effect has calling join on all processes prior to calling get. The only difference is you avoid the deadlock.

这篇关于多重处理的结果是截止到992个整数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆