在读取文件的生成器上进行Python多处理 [英] Python multiprocessing on a generator that reads files in

查看:132
本文介绍了在读取文件的生成器上进行Python多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试读取和处理1000多个文件,但是不幸的是,处理文件所需的时间大约是从磁盘中读取文件的3倍,因此我想在读取这些文件时对其进行处理(并且在我继续阅读其他文件的同时).

I am trying to read and process 1000s of files, but unfortunately it takes about 3x as long to process the file as it does to read it in from disk, so I would like to process these files as they are read in (and while I am continuing to read in additional files).

在一个理想的世界中,我有一个生成器,该生成器一次读取一个文件,并且我希望将此生成器传递给一组工人,这些工人在(缓慢地)生成生成器中的项目时对其进行处理.

In a perfect world, I have a generator which reads one file at a time, and I would like to pass this generator to a pool of workers which process items from the generator as they are (slowly) generated.

这是一个例子:

def process_file(file_string):
     ...
     return processed_file

 pool = Pool(processes=4)
 path = 'some/path/'
 results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))

上面的代码唯一的问题是,在池开始之前,所有文件都已读入内存,这意味着我需要等待磁盘读取所有内容,而且我还要消耗大量内存.

the only issue with the code above is that all the files are read into memory before the pool begins, which means that I need to wait for the disk to read everything in, and I also consume a large amount of memory.

推荐答案

Pool.mapPool.map_async

Pool.map and Pool.map_async listify the iterable passed to them, so your generator will always be realized fully before processing even begins.

出现各种Pool.imap*函数 来将输入作为生成器进行处理,因此您可以进行以下更改:

The various Pool.imap* functions appear to process inputs as generators, so you might be able to change:

results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))

收件人:

# If you can process outputs one at a time, drop the list wrapper
# If you can process outputs without order mattering, imap_unordered will
# get you the best results
results = list(pool.imap(process_file, (open(path+part,'rb').read() for part in os.listdir(path))))

并获得相同的结果,但不会在处理前进行拖尾处理,但是AFAICT仍会尽力尽速完全填充队列,这可能导致大量数据未使用和过多的内存使用;除此之外,您将在一个过程中读取所有数据,然后通过IPC发送所有数据,这意味着您仍然在I/O上处于瓶颈.

and get the same results without slurping before processing, but AFAICT, they'll still try to fully populate the queues as fast as they can, which could lead to a lot of data outstanding and excessive memory usage; beyond that, you'll be reading all the data in one process, then sending all of it over IPC, which means you're still mostly bottlenecked on I/O.

在您的位置,我会将读取内容移入任务本身(如果可以,请避免读取整个文件,而不是逐行或逐块处理它,而不是读取整个内容立刻).您将获得并行读取,更少的IPC,并且甚至在处理前几个文件之前也不会冒着将所有文件都打浆的风险.您打开的文件永远不会比工作人员更多.因此最终结果将如下所示:

In your position, I'd move the read into the task itself (and if you can, avoid reading in the whole file, processing it by line or by block instead of reading the whole thing at once). You'd get parallel reads, less IPC, and you won't risk slurping all the files before the first few are even processed; you'll never have more files open than you have workers. So the end result would look like:

def process_file(path):
     with open(path, 'rb') as f:
         file_string = f.read()
     ... same as before ...
     return processed_file

pool = Pool(processes=4)
path = 'some/path/'
results = pool.imap(process_file, (os.path.join(path, part) for part in os.listdir(path)))

这篇关于在读取文件的生成器上进行Python多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆