具有for循环的多处理池 [英] Multiprocessing Pool with a for loop

查看:9
本文介绍了具有for循环的多处理池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个文件列表,我将这些文件传递给for循环并执行一大堆函数。将其并行化的最简单方法是什么?我不确定在任何地方都能找到这样的东西,而且我认为我当前的实现是不正确的,因为我只看到一个文件在运行。从我所做的一些阅读来看,我认为这应该是一个完全平行的情况。

旧代码是这样的:

import pandas as pd
filenames = ['file1.csv', 'file2.csv', 'file3.csv', 'file4.csv']
for file in filenames:
    file1 = pd.read_csv(file)
    print('running ' + str(file))
    a = function1(file1)
    b = function2(a)
    c = function3(b)
    for d in range(1,6):
            e = function4(c, d)
    c.to_csv('output.csv')

(错误)并行化代码

import pandas as pd
from multiprocessing import Pool
filenames = ['file1.csv', 'file2.csv', 'file3.csv', 'file4.csv']
def multip(filenames):
    file1 = pd.read_csv(file)
    print('running ' + str(file))
    a = function1(file1)
    b = function2(a)
    c = function3(b)
    for d in range(1,6):
            e = function4(c, d)
    c.to_csv('output.csv')

if __name__ == '__main__'
    pool = Pool(processes=4)
    runstuff = pool.map(multip(filenames))

(认为)我想做的是让每个核心计算一个文件(可能是每个进程?)。我也做了

multiprocessing.cpu_count()

和8(我有一个四元组,所以它可能考虑到线程)。因为我总共有大约10个文件,如果我可以在每个进程中放一个文件来加快速度,那就太好了!我希望剩余的2个文件也能在第一轮中的进程完成后找到一个进程。

编辑: 为更清楚起见,函数(即函数1、函数2等)还提供给它们各自文件内的其他函数(即函数1a、函数1b)。我使用IMPORT语句调用函数1。

我收到以下错误:

OSError: Expected file path name or file-like object, got <class 'list'> type

显然不喜欢被传递一个列表,但我不想在if语句中做filename[0],因为它只运行一个文件

推荐答案

import multiprocessing
names = ['file1.csv', 'file2.csv']
def multip(name):
     [do stuff here]

if __name__ == '__main__':
    #use one less process to be a little more stable
    p = multiprocessing.Pool(processes = multiprocessing.cpu_count()-1)
    #timing it...
    start = time.time()
    for file in names:
    p.apply_async(multip, [file])

    p.close()
    p.join()
    print("Complete")
    end = time.time()
    print('total time (s)= ' + str(end-start))

编辑:将if__name__==‘_main_’替换为这个。这将运行所有文件:

if __name__ == '__main__':

    p = Pool(processes = len(names))
    start = time.time()
    async_result = p.map_async(multip, names)
    p.close()
    p.join()
    print("Complete")
    end = time.time()
    print('total time (s)= ' + str(end-start))

这篇关于具有for循环的多处理池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆