从大文件中分块数据以进行多处理? [英] Chunking data from a large file for multiprocessing?
问题描述
我正在尝试使用多处理并行化应用程序一个非常大的 csv 文件(64MB 到 500MB),逐行做一些工作,然后输出一个小的、固定大小的文件文件.
I'm trying to a parallelize an application using multiprocessing which takes in a very large csv file (64MB to 500MB), does some work line by line, and then outputs a small, fixed size file.
目前我做了一个list(file_obj)
,不幸的是它被完全加载了进入内存(我认为)然后我将该列表分成 n 个部分,n 是我想运行的进程数.然后我在分解的地方做一个 pool.map()
列表.
Currently I do a list(file_obj)
, which unfortunately is loaded entirely
into memory (I think) and I then I break that list up into n parts, n being the
number of processes I want to run. I then do a pool.map()
on the broken up
lists.
与单一的相比,这似乎有一个非常非常糟糕的运行时间线程化,只需打开文件并迭代它的方法.有人可以吗提出更好的解决方案?
This seems to have a really, really bad runtime in comparison to a single threaded, just-open-the-file-and-iterate-over-it methodology. Can someone suggest a better solution?
此外,我需要在保留的组中处理文件的行某个列的值.这些行组本身可以拆分,但对于这一列,任何组都不应包含多个值.
Additionally, I need to process the rows of the file in groups which preserve the value of a certain column. These groups of rows can themselves be split up, but no group should contain more than one value for this column.
推荐答案
list(file_obj)
当 fileobj
很大时,可能需要大量内存.我们可以通过使用 itertools 来提取行块来减少内存需求因为我们需要他们.
list(file_obj)
can require a lot of memory when fileobj
is large. We can reduce that memory requirement by using itertools to pull out chunks of lines as we need them.
特别是,我们可以使用
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
将文件拆分为可处理的块,以及
to split the file into processable chunks, and
groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)]
result = pool.map(worker, groups)
让多处理池一次处理 num_chunks
个块.
to have the multiprocessing pool work on num_chunks
chunks at a time.
通过这样做,我们大约只需要足够的内存来在内存中保存几个 (num_chunks
) 块,而不是整个文件.
By doing so, we need roughly only enough memory to hold a few (num_chunks
) chunks in memory, instead of the whole file.
import multiprocessing as mp
import itertools
import time
import csv
def worker(chunk):
# `chunk` will be a list of CSV rows all with the same name column
# replace this with your real computation
# print(chunk)
return len(chunk)
def keyfunc(row):
# `row` is one row of the CSV file.
# replace this with the name column.
return row[0]
def main():
pool = mp.Pool()
largefile = 'test.dat'
num_chunks = 10
results = []
with open(largefile) as f:
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
print(results)
if __name__ == '__main__':
main()
这篇关于从大文件中分块数据以进行多处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!