Python 无法使用 multiprocessing.pool 分配内存 [英] Python cannot allocate memory using multiprocessing.pool

查看:36
本文介绍了Python 无法使用 multiprocessing.pool 分配内存的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的代码(遗传优化算法的一部分)并行运行几个进程,等待所有进程完成,读取输出,然后用不同的输入重复.当我重复 60 次测试时,一切正常.由于它有效,我决定使用更实际的重复次数,200.我收到了这个错误:

My code (part of a genetic optimization algorithm) runs a few processes in parallel, waits for all of them to finish, reads the output, and then repeats with a different input. Everything was working fine when I tested with 60 repetitions. Since it worked, I decided to use a more realistic number of repetitions, 200. I received this error:

File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
 self.run()
File "/usr/lib/python2.7/threading.py", line 504, in run
 self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 302, in _handle_workers
 pool._maintain_pool()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 206, in _maintain_pool
 self._repopulate_pool()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 199, in _repopulate_pool
 w.start()
File "/usr/lib/python2.7/multiprocessing/process.py", line 130, in start
 self._popen = Popen(self)
File "/usr/lib/python2.7/multiprocessing/forking.py", line 120, in __init__
 self.pid = os.fork()
OSError: [Errno 12] Cannot allocate memory

这是我使用池的代码片段:

Here is a snippet of my code that uses pool:

def RunMany(inputs):
from multiprocessing import cpu_count, Pool
proc=inputs[0]
pool=Pool(processes = proc) 
results=[]
for arg1 in inputs[1]:
    for arg2 in inputs[2]:
        for arg3 in inputs[3]:
            results.append(pool.apply_async(RunOne, args=(arg1, arg2, arg3)))
casenum=0
datadict=dict()
for p in results:
    #get results of simulation once it has finished
    datadict[casenum]=p.get() 
    casenum+=1
return datadict

RunOne 函数在我创建的类中创建一个对象,使用计算量很大的 python 包解决一个大约需要 30 秒的化学问题,并返回带有化学求解器输出的对象.

The RunOne function creates an object in class I created, uses a computationally-heavy python package to solve a chemistry problem that takes about 30 seconds, and returns the object with the output of the chemistry solver.

因此,我的代码串行调用 RunMany,然后 RunMany 并行调用 RunOne.在我的测试中,我使用 10 个处理器(计算机有 16 个)和 20 个对 RunOne 的调用调用了 RunOne.换句话说,len(arg1)*len(arg2)*len(arg3)=20.当我的代码调用 RunMany 60 次时一切正常,但是当我调用它 200 次时内存不足.

So, my code calls RunMany in serial, and RunMany then calls RunOne in parallel. In my testing, I've called RunOne using 10 processors (the computer has 16) and a pool of 20 calls to RunOne. In other words, len(arg1)*len(arg2)*len(arg3)=20. Everything worked fine when my code called RunMany 60 times, but I ran out of memory when I called it 200 times.

这是否意味着某些进程本身没有正确清理?我有内存泄漏吗?如何确定是否存在内存泄漏,以及如何找出泄漏的原因?在我的 200 次重复循环中唯一增长的项目是从 0 大小增长到 200 长度的数字列表.我有一个来自我构建的自定义类的对象字典,但它的长度有上限50 个条目 - 每次循环执行时,它会从字典中删除一个项目并将其替换为另一个项目.

Does this mean some process isn't correctly cleaning up after itself? Do I have a memory leak? How can I determine if I have a memory leak, and how do I find out the cause of the leak? The only item that is growing in my 200-repetition loop is a list of numbers that grows from 0 size to a length of 200. I have a dictionary of objects from a custom class I've built, but it is capped at a length of 50 entries - each time the loop executes, it deletes an item from the dictionary and replaces it with another item.

这是调用 RunMany 的代码片段

Here is a snippet of the code that calls RunMany

for run in range(nruns):
    #create inputs object for RunMany using genetic methods. 
    #Either use starting "population" or create "child" inputs from successful previous runs
    datadict = RunMany(inputs)

    sumsquare=0
    for i in range(len(datadictsenk)): #input condition
        sumsquare+=Compare(datadict[i],Target[i]) #compare result to target

    with open(os.path.join(mainpath,'Outputs','output.txt'),'a') as f:
        f.write('	'.join([str(x) for x in [inputs.name, sumsquare]])+'
')

    Objective.append(sumsquare) #add sum of squares to list, to be plotted outside of loop
    population[inputs]=sumsquare #add/update the model in the "population", using the inputs object as a key, and it's objective function as the value
    if len(population)>initialpopulation:
        population = PopulationReduction(population) #reduce the "population" by "killing" unfit "genes"
    avgtime=(datetime.datetime.now()-starttime2)//(run+1)
    remaining=(nruns-run-1)*avgtime
    print(' Finished '+str(run+1)+' / ' +str(nruns)+'. Elapsed: '+str(datetime.datetime.now().replace(microsecond=0)-starttime)+' Remaining: '+str(remaining)+' Finish at '+str((datetime.datetime.now()+remaining).replace(microsecond=0))+'~~~', end="
")

推荐答案

如我问题的评论所示,答案来自 Puciek.

As shown in the comments to my question, the answer came from Puciek.

解决方案是在完成后关闭进程池.我认为它会自动关闭,因为 results 变量是 RunMany 的本地变量,并且会在 RunMany 完成后被删除.但是,python 并不总是按预期工作.

The solution was to close the pool of processes after it is finished. I thought that it would be closed automatically because the results variable is local to RunMany, and would be deleted after RunMany completed. However, python doesn't always work as expected.

固定代码为:

def RunMany(inputs):
from multiprocessing import cpu_count, Pool
proc=inputs[0]
pool=Pool(processes = proc) 
results=[]
for arg1 in inputs[1]:
    for arg2 in inputs[2]:
        for arg3 in inputs[3]:
            results.append(pool.apply_async(RunOne, args=(arg1, arg2, arg3)))
#new section
pool.close()
pool.join()    
#end new section
casenum=0
datadict=dict()
for p in results:
    #get results of simulation once it has finished
    datadict[casenum]=p.get() 
    casenum+=1
return datadict

这篇关于Python 无法使用 multiprocessing.pool 分配内存的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆