python或dask并行生成器? [英] python or dask parallel generator?
问题描述
在python中(也许使用dask,也许使用多处理)是否有可能将生成器放置"在内核上,然后并行执行生成器并处理结果?
Is it possible in python (maybe using dask, maybe using multiprocessing) to 'emplace' generators on cores, and then, in parallel, step through the generators and process the results?
尤其需要生成器(或具有 __ iter __
的对象);生成器产生的所有产生元素的列表将不适合存储在内存中.
It needs to be generators in particular (or objects with __iter__
); lists of all the yielded elements the generators yield won't fit into memory.
尤其是:
使用熊猫,我可以调用 read_csv(... iterator = True)
,这为我提供了一个迭代器(TextFileReader)-我可以在其中进行 for
或显式调用接下来的多次.整个csv永远不会被读取到内存中.很好.
With pandas, I can call read_csv(...iterator=True)
, which gives me an iterator (TextFileReader) - I can for in
it or explicitly call next multiple times. The entire csv never gets read into memory. Nice.
每次我从迭代器中读取下一个块时,我都会对其进行一些昂贵的计算.
Every time I read a next chunk from the iterator, I also perform some expensive computation on it.
但是现在我有2个这样的文件.我想创建2个这样的生成器,并在一个内核上创建1个,在另一个内核上创建1个,这样我就可以:
But now I have 2 such files. I would like to create 2 such generators, and 'emplace' 1 on one core and 1 on another, such that I can:
result = expensive_process(next(iterator))
在每个内核上并行执行,然后合并并返回结果.重复此步骤,直到一个或两个发电机都无法使用.
on each core, in parallel, and then combine and return the result. Repeat this step until one generator or both is out of yield.
看起来TextFileReader不可腌制,也不是生成器.我找不到如何在快速或多处理中执行此操作的方法.有这种模式吗?
It looks like the TextFileReader is not pickleable, nor is a generator. I can't find out how to do this in dask or multiprocessing. Is there a pattern for this?
推荐答案
Dask的read_csv旨在以块为单位从多个文件中加载数据,并且可以指定块大小.当对结果数据帧进行操作时,将逐块进行操作,这恰恰是首先使用Dask的要点.不需要使用您的迭代器方法.
Dask's read_csv is designed to load data from multiple files in chunks, with a chunk-size that you can specify. When you operate on the resultant dataframe, you will be working chunk-wise, which is exactly the point of using Dask in the first place. There should be no need to use your iterator method.
您最想使用的dask dataframe方法很可能是 map_partitions()
.
The dask dataframe method you will want to use, most likely, is map_partitions()
.
如果您真的想使用迭代器的想法,则应该通过发送每次调用的 dask.delayed
来实现并行化任意python函数的功能.功能(每个文件都有不同的文件名).
If you really wanted to use the iterator idea, you should look into dask.delayed
, which is able to parallelise arbitrary python functions, by sending each invocation of the function (with a different file-name for each) to your workers.
这篇关于python或dask并行生成器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!