多处理未派生所有请求的进程 [英] Multiprocessing not spawning all the requested processes

查看:25
本文介绍了多处理未派生所有请求的进程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Oraflex(一个用于离岸分析的有限元软件,但不应该是相关的)。我创建了一个脚本来检查我执行的模拟是否已成功完成(模拟可能会因为没有达到收敛而失败)。因为我谈论的是数千个文件,所以我尝试使用multiprocessing并行化该进程。下面是我的代码。很抱歉,我不能为您提供一个有效的示例,但我会尝试详细解释。我创建了multiprocessing.Process的派生类,并覆盖了run()以执行对模拟文件的检查。 然后,在__main__中,我设置了一些处理器,相应地拆分文件,并开始执行。

问题是这些进程并没有完全派生,而是在一个进程到另一个进程的随机时间量内产生。这是理所当然的吗,还是我错过了什么? 我不完全产卵的意思是,我看到:

[Info/Worker-1] child process calling self.run()

例如:

[Info/Worker-4] child process calling self.run()

程序运行约10分钟后。

事先感谢您的帮助/建议。

import os
import subprocess
import glob
import multiprocessing
import logging
import sys
import OrcFxAPI as of

class Worker(multiprocessing.Process):

    myJobs = []

    def setJobs(self, jobList):
        self.myJobs = jobList

    @staticmethod
    def changedExtensionFileName(oldFileName, newExtension):
        return '.'.join((os.path.splitext(oldFileName)[0], newExtension))

    def run(self):
        failed = []
        model = of.Model(threadCount=1)

        for job in self.myJobs:
            try:
                print('%s starting' % job)
                sys.stdout.flush()
                model.LoadSimulation(job)
                if model.state == of.ModelState.SimulationStoppedUnstable:
                    newJob = job.replace('.sim', '.dat')
                    failed.append(newJob)

                    with open('Failed_Sim.txt', 'a') as f:
                        f.write(f'{newJob}
')
                        f.close()

                    model.LoadData(newJob)
                    model.general.ImplicitConstantTimeStep /= 2
                    model.SaveData(newJob)
                    print(f'{job} has failed, reducing time step')

            except of.DLLError as err:
                print('%s ERROR: %s' % (job, err))
                sys.stdout.flush()
                with open(self.changedExtensionFileName(job, 'FAIL'), 'w') as f:
                    f.write('%s error: %s' % (job, err))
                    f.close()
        return



if __name__ == '__main__':
    import re
    sim_file = [f for f in os.listdir() if re.search(r'dddd.*.sim', f)]    

    # begin multprocessing
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)

    corecount = 14 

    workers = []

    chunkSize = int(len(sim_file) / corecount)
    chunkRemainder = int(len(sim_file) % corecount)
    print('%s jobs found, dividing across %s workers - %s each remainder %s' % (str(len(sim_file)), str(corecount), chunkSize, chunkRemainder))

    start = 0
    for coreNum in range(0, corecount):
        worker = Worker()
        workers.append(worker)
        end = start + chunkSize
        if chunkRemainder>0:
            chunkRemainder -= 1
            end += 1
        if end>len(sim_file):
            end = len(sim_file)
        worker.setJobs(sim_file[start:end])
        worker.start()
        start = end
        if start>=len(sim_file):
            break

    for worker in workers:
        worker.join()
    print('Done...')

推荐答案

好的,所以没有人举手来回答这个问题(我不知道怎么做!),所以来了一个更大的重组建议...

def worker(inpData):
    #The worker process

    failed1 = []
    failed2 = []

    for job in inpData:   #I'm not sure of the data shape of the chunks, has your original method split them into coherent chunks capable of being processed independently? My step here could be wrong. 
        try:
            #print('%s starting' % job)  #Prints won't appear on console from worker processes from windows, so commented them all out
            model.LoadSimulation(job)
            if model.state == of.ModelState.SimulationStoppedUnstable:
                newJob = job.replace('.sim', '.dat')
                failed1.append(newJob)

                #I'd recommend we pass the list "failed" back to master and write to text from there, otherwise you could have several processes updating the text file at once, leading to possible loss of data
                #with open('Failed_Sim.txt', 'a') as f:
                #     f.write(f'{newJob}
')
                #     f.close()

                model.LoadData(newJob)
                model.general.ImplicitConstantTimeStep /= 2
                model.SaveData(newJob)
                #print(f'{job} has failed, reducing time step')   

            except of.DLLError as err:
                #print('%s ERROR: %s' % (job, err))
                #sys.stdout.flush()
                #with open(self.changedExtensionFileName(job, 'FAIL'), 'w') as f:
                #    f.write('%s error: %s' % (job, err))
                #    f.close()
                failed2.append(job)

#Note I've made two failed lists to pass back, for both failure types

return failed1, failed2


if __name__ == "__main__":
    import re
    import multiprocessing as mp
    nCPUs = mp.cpu_count()

    sim_file = [f for f in os.listdir() if re.search(r'dddd.*.sim', f)] 

    #Make the chunks
    chunkSize = int(len(sim_file) / corecount)
    chunkRemainder = int(len(sim_file) % corecount)
    print('%s jobs found, dividing across %s workers - %s each remainder %s' % (str(len(sim_file)), str(corecount), chunkSize, chunkRemainder))

    chunks = []
    start = 0
    for iChunk in range(0, nCPUs)
        end = start + chunkSize
        if chunkRemainder>0:
            chunkRemainder -= 1
            end += 1
            if end>len(sim_file):
                end = len(sim_file)
        chunk.append(sim_file[start:end])


    #Send to workers
    pool = mp.Pool(processes=nCPUs)
    futA = []

    for iChunk in range(0, nCPUs):
        futA.append(pool.apply_async(worker, args=(chunk[iChunk],))
    

    #Gather results
    if futA:
        failedDat = []
        failedSim = []
        for iChunk in range(0, len(futA)):
            resA, resB = futA[iChunk].get()
            failedDat.extend(resA)
            failedSim.extend(resB)
    pool.close()
            
    if failedDat:
        print("Following jobs failed, reducing timesteps:")
        print(failedDat)
    if failedSim:
        print("Following sims failed due to errors")
        print(failedSim) 

这篇关于多处理未派生所有请求的进程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆