Python分块CSV文件多处理 [英] Python Chunking CSV File Multiproccessing
问题描述
我使用以下代码将CSV文件拆分为多个块(源自这里)
I'm using the following code to split a CSV file into multiple chunks (sourced from here)
def worker(chunk):
print len(chunk)
def keyfunc(row):
return row[0]
def main():
pool = mp.Pool()
largefile = 'Counseling.csv'
num_chunks = 10
start_time = time.time()
results = []
with open(largefile) as f:
reader = csv.reader(f)
reader.next()
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
但是,看起来块的数量总是保持不变,与我选择使用的块数无关。例如,无论我选择有1或10个块,我总是在处理示例文件时获得此输出。理想情况下,我想要一个文件,使其公平分布。
However, it seems that the number of chunks always remains constant regardless of the number of chunks that I choose to use. For example, whether I choose to have 1 or 10 chunks, I always get this output when processing a sample file. Ideally, I'd like to chunk a file so that it is equitably distributed.
注意,我真正的文件,我分块超过1300万行长,这就是为什么我我一块一块地处理它。这是必须的!
Note, the real file I am chunking is over 13 million rows long which is why I am processing it piece by piece. That is a must!
6
7
1
...
1
1
94
--- 0.101687192917 seconds ---
推荐答案
根据
comments ,
我们希望每个进程在一个10000行的块上工作。这不是太难,
做;请参阅下面的 iter / islice
食谱。但是,使用
Per the
comments,
we wish to have each process work on a 10000-row chunk. That's not too hard to
to do; see the iter/islice
recipe below. However, the problem with using
pool.map(worker, ten_thousand_row_chunks)
是 pool.map
将尝试将所有的块在任务队列中立即
。如果这需要更多的内存,那么你会得到
MemoryError
。 (注意: pool.imap
遇到同样的问题。)
is that pool.map
will attempt to put all the chunks in a task queue
at once. If this requires more memory than is available then you get a
MemoryError
. (Note: pool.imap
suffers from the same problem.)
所以我们需要调用
So instead, we need to call pool.map
iteratively, on pieces of each chunk.
import itertools as IT
import multiprocessing as mp
import csv
def worker(chunk):
return len(chunk)
def main():
# num_procs is the number of workers in the pool
num_procs = mp.cpu_count()
# chunksize is the number of lines in a chunk
chunksize = 10**5
pool = mp.Pool(num_procs)
largefile = 'Counseling.csv'
results = []
with open(largefile, 'rb') as f:
reader = csv.reader(f)
for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []):
chunk = iter(chunk)
pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), []))
result = pool.map(worker, pieces)
results.extend(result)
print(results)
pool.close()
pool.join()
main()
每个 chunk
最多包含 chunksize * num_procs
文件中的行。
这是足够的数据给池中的所有工作者一些工作,但不是太大,导致一个MemoryError - 提供 chunksize
不设置太大。
Each chunk
will consist of up to chunksize*num_procs
lines from the file.
This is enough data to give all workers in the pool something to work on, but not too big as to cause a MemoryError -- provided chunksize
is not set too large.
每个 chunk
然后被分成多个部分,每个部分最多包含
chunksize
行。然后将这些片段发送到 pool.map
。
Each chunk
is then broken into pieces, with each piece consisting of up to
chunksize
rows from the file. These pieces are then sent to pool.map
.
strong> iter(lambda:list(IT.islice(iterator,chunksize)),[])
工作:
How does iter(lambda: list(IT.islice(iterator, chunksize)), [])
work:
这是一个习语,用于将迭代器分组为长度为chunksize的块。
让我们看看它是如何工作的一个例子:
This is an idiom for grouping an iterator into chunks of length chunksize. Let's see how it works on an example:
In [111]: iterator = iter(range(10))
注意每次 IT.islice(迭代器,3)
被调用,3个项目的新的块
被切离迭代器:
Notice that each time IT.islice(iterator, 3)
is called, a new chunk of 3 items
is sliced off of the iterator:
In [112]: list(IT.islice(iterator, 3))
Out[112]: [0, 1, 2]
In [113]: list(IT.islice(iterator, 3))
Out[113]: [3, 4, 5]
In [114]: list(IT.islice(iterator, 3))
Out[114]: [6, 7, 8]
迭代器,只返回剩下的内容:
When there are fewer than 3 items left in the iterator, only what remains is returned:
In [115]: list(IT.islice(iterator, 3))
Out[115]: [9]
如果再次调用它,
In [116]: list(IT.islice(iterable, 3))
Out[116]: []
lambda:list(it.islice(iterator,chunksize) )
是一个在调用时返回 list(IT.islice(iterator,chunksize))
的函数。它是一个一行,等同于
lambda: list(IT.islice(iterator, chunksize))
is a function which returns list(IT.islice(iterator, chunksize))
when called. It is a "one-liner" which is equivalent to
def func():
return list(IT.islice(iterator, chunksize))
最后, iter(callable,sentinel)
返回另一个迭代器。此迭代器生成的值是可调用项返回的值。它继续产生值,直到callable返回一个等于sentinel的值。所以
Finally, iter(callable, sentinel)
returns another iterator. The values yielded by this iterator are the values returned by the callable. It keeps on yielding values until the callable returns a value equal to the sentinel. So
iter(lambda: list(IT.islice(iterator, chunksize)), [])
将继续返回值 list(IT.islice(iterator,chunksize))
直到该值为空列表:
will keep on returning the values list(IT.islice(iterator, chunksize))
until that value is the empty list:
In [121]: iterator = iter(range(10))
In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), []))
Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
这篇关于Python分块CSV文件多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!