Python多处理池映射和imap [英] Python multiprocessing Pool map and imap

查看:651
本文介绍了Python多处理池映射和imap的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个multiprocessing脚本,其中的pool.map可以正常工作.问题在于,并非所有进程都需要花费很长时间才能完成,因此某些进程因为要等到所有进程完成才进入睡眠状态(与

I have a multiprocessing script with pool.map that works. The problem is that not all processes take as long to finish, so some processes fall asleep because they wait until all processes are finished (same problem as in this question). Some files are finished in less than a second, others take minutes (or hours).

如果我了解手册和这篇文章)正确地表示,pool.imap并不等待所有进程完成,如果已完成,它正在提供一个新文件进行处理.当我尝试这样做时,脚本将加快要处理的文件的速度,小的文件将按预期方式处理,大的文件(需要更多时间来处理)直到结尾都没有完成(被杀死,恕不另行通知?).这是pool.imap的正常行为,还是我需要添加更多命令/参数?当我在else部分中添加time.sleep(100)作为测试时,它正在处理更大的文件,但其他进程却无法入睡.有什么建议 ?谢谢

If I understand the manual (and this post) correctly, pool.imap is not waiting for all the processes to finish, if one is done, it is providing a new file to process. When I try that, the script is speeding over the files to process, the small ones are processed as expected, the large files (that take more time to process) don't finish until the end (are killed without notice ?). Is this normal behavior for pool.imap, or do I need to add more commands/parameters ? When I add the time.sleep(100) in the else part as test, it is processing more large files but the other processes fall asleep. Any suggestions ? Thanks

def process_file(infile):
    #read infile
    #compare things in infile
    #acquire Lock, save things in outfile, release Lock
    #delete infile

def main():
    #nprocesses = 8
    global filename
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
    for d in pathlist:
        os.chdir(d)      
        todolist = []
        for infile in os.listdir():  
            todolist.append(infile)
        try:   
            p = Pool(processes=nprocesses)
            p.imap(process_file, todolist)
        except KeyboardInterrupt:                
            print("Shutting processes down")
           # Optionally try to gracefully shut down the worker processes here.       
            p.close()
            p.terminate()
            p.join()
        except StopIteration:
            continue     
        else:
            time.sleep(100)
            os.chdir('..')
        p.close()
        p.join() 

if __name__ == '__main__':
    main()    

推荐答案

由于您已经将所有文件放入列表中,因此可以将它们直接放入队列中.然后,该队列将与您的子流程共享,这些子流程将从队列中获取文件名并执行其工作.无需重复两次(首先进入列表,然后由Pool.imap进行泡菜列表). Pool.imap的功能完全相同,但您不知道.

Since you already put all your files in a list, you could put them directly into a queue. The queue is then shared with your sub-processes that take the file names from the queue and do their stuff. No need to do it twice (first into list, then pickle list by Pool.imap). Pool.imap is doing exactly the same but without you knowing it.

todolist = []
for infile in os.listdir():  
    todolist.append(infile)

可以替换为:

todolist = Queue()
for infile in os.listdir():  
    todolist.put(infile)

完整的解决方案如下:

def process_file(inqueue):
    for infile in iter(inqueue.get, "STOP"):
        #do stuff until inqueue.get returns "STOP"
    #read infile
    #compare things in infile
    #acquire Lock, save things in outfile, release Lock
    #delete infile

def main():
    nprocesses = 8
    global filename
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
    for d in pathlist:
        os.chdir(d)      
        todolist = Queue()
        for infile in os.listdir():  
            todolist.put(infile)
        process = [Process(target=process_file,
                      args=(todolist) for x in range(nprocesses)]
        for p in process:
            #task the processes to stop when all files are handled
            #"STOP" is at the very end of queue
            todolist.put("STOP")
        for p in process:
            p.start()
        for p in process:
            p.join()    
if __name__ == '__main__':
    main()

这篇关于Python多处理池映射和imap的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆