Python无法使用multiprocessing.pool分配内存 [英] Python cannot allocate memory using multiprocessing.pool

查看:535
本文介绍了Python无法使用multiprocessing.pool分配内存的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的代码(遗传优化算法的一部分)并行运行几个过程,等待所有过程完成,读取输出,然后使用不同的输入重复执行.当我进行60次重复测试时,一切工作正常.由于它有效,所以我决定使用更实际的重复次数200.我收到此错误:

My code (part of a genetic optimization algorithm) runs a few processes in parallel, waits for all of them to finish, reads the output, and then repeats with a different input. Everything was working fine when I tested with 60 repetitions. Since it worked, I decided to use a more realistic number of repetitions, 200. I received this error:

File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
 self.run()
File "/usr/lib/python2.7/threading.py", line 504, in run
 self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 302, in _handle_workers
 pool._maintain_pool()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 206, in _maintain_pool
 self._repopulate_pool()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 199, in _repopulate_pool
 w.start()
File "/usr/lib/python2.7/multiprocessing/process.py", line 130, in start
 self._popen = Popen(self)
File "/usr/lib/python2.7/multiprocessing/forking.py", line 120, in __init__
 self.pid = os.fork()
OSError: [Errno 12] Cannot allocate memory

这是我的使用pool的代码的片段:

Here is a snippet of my code that uses pool:

def RunMany(inputs):
from multiprocessing import cpu_count, Pool
proc=inputs[0]
pool=Pool(processes = proc) 
results=[]
for arg1 in inputs[1]:
    for arg2 in inputs[2]:
        for arg3 in inputs[3]:
            results.append(pool.apply_async(RunOne, args=(arg1, arg2, arg3)))
casenum=0
datadict=dict()
for p in results:
    #get results of simulation once it has finished
    datadict[casenum]=p.get() 
    casenum+=1
return datadict

RunOne函数在我创建的类中创建一个对象,使用计算量大的python包解决耗时约30秒的化学问题,并将该对象与化学求解器的输出一起返回.

The RunOne function creates an object in class I created, uses a computationally-heavy python package to solve a chemistry problem that takes about 30 seconds, and returns the object with the output of the chemistry solver.

所以,我的代码串行调用RunMany,然后RunMany并行调用RunOne.在测试中,我使用10个处理器(计算机有16个)和对RunOne的20个调用池调用了RunOne.换句话说,len(arg1)* len(arg2)* len(arg3)= 20.当我的代码调用RunMany 60次时,一切工作正常,但是当我调用200次时,我的内存不足.

So, my code calls RunMany in serial, and RunMany then calls RunOne in parallel. In my testing, I've called RunOne using 10 processors (the computer has 16) and a pool of 20 calls to RunOne. In other words, len(arg1)*len(arg2)*len(arg3)=20. Everything worked fine when my code called RunMany 60 times, but I ran out of memory when I called it 200 times.

这是否意味着某些过程本身无法正确清除?我有内存泄漏吗?如何确定我是否有内存泄漏,以及如何找出泄漏的原因?我的200次重复循环中唯一增加的项是一个从0大小增长到200长度的数字列表.我有一个自定义类的对象字典,但该对象的上限为共有50个条目-每次执行循环时,都会从字典中删除一个项目,然后将其替换为另一个项目.

Does this mean some process isn't correctly cleaning up after itself? Do I have a memory leak? How can I determine if I have a memory leak, and how do I find out the cause of the leak? The only item that is growing in my 200-repetition loop is a list of numbers that grows from 0 size to a length of 200. I have a dictionary of objects from a custom class I've built, but it is capped at a length of 50 entries - each time the loop executes, it deletes an item from the dictionary and replaces it with another item.

这是调用RunMany的代码的片段

Here is a snippet of the code that calls RunMany

for run in range(nruns):
    #create inputs object for RunMany using genetic methods. 
    #Either use starting "population" or create "child" inputs from successful previous runs
    datadict = RunMany(inputs)

    sumsquare=0
    for i in range(len(datadictsenk)): #input condition
        sumsquare+=Compare(datadict[i],Target[i]) #compare result to target

    with open(os.path.join(mainpath,'Outputs','output.txt'),'a') as f:
        f.write('\t'.join([str(x) for x in [inputs.name, sumsquare]])+'\n')

    Objective.append(sumsquare) #add sum of squares to list, to be plotted outside of loop
    population[inputs]=sumsquare #add/update the model in the "population", using the inputs object as a key, and it's objective function as the value
    if len(population)>initialpopulation:
        population = PopulationReduction(population) #reduce the "population" by "killing" unfit "genes"
    avgtime=(datetime.datetime.now()-starttime2)//(run+1)
    remaining=(nruns-run-1)*avgtime
    print(' Finished '+str(run+1)+' / ' +str(nruns)+'. Elapsed: '+str(datetime.datetime.now().replace(microsecond=0)-starttime)+' Remaining: '+str(remaining)+' Finish at '+str((datetime.datetime.now()+remaining).replace(microsecond=0))+'~~~', end="\r")

推荐答案

如对我的问题的评论所示,答案来自Puciek.

As shown in the comments to my question, the answer came from Puciek.

解决方案是在完成后关闭进程池.我认为它将自动关闭,因为results变量是RunMany的本地变量,并且在RunMany完成后将被删除.但是,python并不总是能按预期工作.

The solution was to close the pool of processes after it is finished. I thought that it would be closed automatically because the results variable is local to RunMany, and would be deleted after RunMany completed. However, python doesn't always work as expected.

固定代码为:

def RunMany(inputs):
from multiprocessing import cpu_count, Pool
proc=inputs[0]
pool=Pool(processes = proc) 
results=[]
for arg1 in inputs[1]:
    for arg2 in inputs[2]:
        for arg3 in inputs[3]:
            results.append(pool.apply_async(RunOne, args=(arg1, arg2, arg3)))
#new section
pool.close()
pool.join()    
#end new section
casenum=0
datadict=dict()
for p in results:
    #get results of simulation once it has finished
    datadict[casenum]=p.get() 
    casenum+=1
return datadict

这篇关于Python无法使用multiprocessing.pool分配内存的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆