Python分块CSV文件多处理 [英] Python Chunking CSV File Multiproccessing

查看:847
本文介绍了Python分块CSV文件多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用以下代码将CSV文件拆分为多个块(源自这里

I'm using the following code to split a CSV file into multiple chunks (sourced from here)

def worker(chunk):
    print len(chunk)

def keyfunc(row):
    return row[0]

def main():
    pool = mp.Pool()
    largefile = 'Counseling.csv'
    num_chunks = 10
    start_time = time.time()
    results = []
    with open(largefile) as f:
        reader = csv.reader(f)
        reader.next()
        chunks = itertools.groupby(reader, keyfunc)
        while True:
            # make a list of num_chunks chunks
            groups = [list(chunk) for key, chunk in
                      itertools.islice(chunks, num_chunks)]
            if groups:
                result = pool.map(worker, groups)
                results.extend(result)
            else:
                break
    pool.close()
    pool.join()

但是,看起来块的数量总是保持不变,与我选择使用的块数无关。例如,无论我选择有1或10个块,我总是在处理示例文件时获得此输出。理想情况下,我想要一个文件,使其公平分布。

However, it seems that the number of chunks always remains constant regardless of the number of chunks that I choose to use. For example, whether I choose to have 1 or 10 chunks, I always get this output when processing a sample file. Ideally, I'd like to chunk a file so that it is equitably distributed.

注意,我真正的文件,我分块超过1300万行长,这就是为什么我我一块一块地处理它。这是必须的!

Note, the real file I am chunking is over 13 million rows long which is why I am processing it piece by piece. That is a must!

6
7
1
...
1
1
94
--- 0.101687192917 seconds ---


推荐答案

根据
comments

我们希望每个进程在一个10000行的块上工作。这不是太难,
做;请参阅下面的 iter / islice 食谱。但是,使用

Per the comments, we wish to have each process work on a 10000-row chunk. That's not too hard to to do; see the iter/islice recipe below. However, the problem with using

pool.map(worker, ten_thousand_row_chunks)

pool.map 将尝试将所有的块在任务队列中立即
。如果这需要更多的内存,那么你会得到
MemoryError 。 (注意: pool.imap 遇到同样的问题。)

is that pool.map will attempt to put all the chunks in a task queue at once. If this requires more memory than is available then you get a MemoryError. (Note: pool.imap suffers from the same problem.)

所以我们需要调用

So instead, we need to call pool.map iteratively, on pieces of each chunk.

import itertools as IT
import multiprocessing as mp
import csv

def worker(chunk):
    return len(chunk)

def main():
    # num_procs is the number of workers in the pool
    num_procs = mp.cpu_count()
    # chunksize is the number of lines in a chunk
    chunksize = 10**5

    pool = mp.Pool(num_procs)
    largefile = 'Counseling.csv'
    results = []
    with open(largefile, 'rb') as f:
        reader = csv.reader(f)
        for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []):
            chunk = iter(chunk)
            pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), []))
            result = pool.map(worker, pieces)
            results.extend(result)
    print(results)
    pool.close()
    pool.join()

main()

每个 chunk 最多包含 chunksize * num_procs 文件中的行。
这是足够的数据给池中的所有工作者一些工作,但不是太大,导致一个MemoryError - 提供 chunksize 不设置太大。

Each chunk will consist of up to chunksize*num_procs lines from the file. This is enough data to give all workers in the pool something to work on, but not too big as to cause a MemoryError -- provided chunksize is not set too large.

每个 chunk 然后被分成多个部分,每个部分最多包含
chunksize 行。然后将这些片段发送到 pool.map

Each chunk is then broken into pieces, with each piece consisting of up to chunksize rows from the file. These pieces are then sent to pool.map.

strong> iter(lambda:list(IT.islice(iterator,chunksize)),[])工作:

How does iter(lambda: list(IT.islice(iterator, chunksize)), []) work:

这是一个习语,用于将迭代器分组为长度为chunksize的块。
让我们看看它是如何工作的一个例子:

This is an idiom for grouping an iterator into chunks of length chunksize. Let's see how it works on an example:

In [111]: iterator = iter(range(10))

注意每次 IT.islice(迭代器,3)被调用,3个项目的新的块
被切离迭代器:

Notice that each time IT.islice(iterator, 3) is called, a new chunk of 3 items is sliced off of the iterator:

In [112]: list(IT.islice(iterator, 3))
Out[112]: [0, 1, 2]

In [113]: list(IT.islice(iterator, 3))
Out[113]: [3, 4, 5]

In [114]: list(IT.islice(iterator, 3))
Out[114]: [6, 7, 8]

迭代器,只返回剩下的内容:

When there are fewer than 3 items left in the iterator, only what remains is returned:

In [115]: list(IT.islice(iterator, 3))
Out[115]: [9]

如果再次调用它,

In [116]: list(IT.islice(iterable, 3))
Out[116]: []

lambda:list(it.islice(iterator,chunksize) )是一个在调用时返回 list(IT.islice(iterator,chunksize))的函数。它是一个一行,等同于

lambda: list(IT.islice(iterator, chunksize)) is a function which returns list(IT.islice(iterator, chunksize)) when called. It is a "one-liner" which is equivalent to

def func():
    return  list(IT.islice(iterator, chunksize))



最后, iter(callable,sentinel) 返回另一个迭代器。此迭代器生成的值是可调用项返回的值。它继续产生值,直到callable返回一个等于sentinel的值。所以

Finally, iter(callable, sentinel) returns another iterator. The values yielded by this iterator are the values returned by the callable. It keeps on yielding values until the callable returns a value equal to the sentinel. So

iter(lambda: list(IT.islice(iterator, chunksize)), [])

将继续返回值 list(IT.islice(iterator,chunksize))直到该值为空列表:

will keep on returning the values list(IT.islice(iterator, chunksize)) until that value is the empty list:

In [121]: iterator = iter(range(10))

In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), []))
Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]

这篇关于Python分块CSV文件多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆