python或dask并行生成器? [英] python or dask parallel generator?

查看:56
本文介绍了python或dask并行生成器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在python中(也许使用dask,也许使用多处理)是否有可能将生成器放置"在内核上,然后并行执行生成器并处理结果?

Is it possible in python (maybe using dask, maybe using multiprocessing) to 'emplace' generators on cores, and then, in parallel, step through the generators and process the results?

尤其需要生成器(或具有 __ iter __ 的对象);生成器产生的所有产生元素的列表将不适合存储在内存中.

It needs to be generators in particular (or objects with __iter__); lists of all the yielded elements the generators yield won't fit into memory.

尤其是:

使用熊猫,我可以调用 read_csv(... iterator = True),这为我提供了一个迭代器(TextFileReader)-我可以在其中进行 for 或显式调用接下来的多次.整个csv永远不会被读取到内存中.很好.

With pandas, I can call read_csv(...iterator=True), which gives me an iterator (TextFileReader) - I can for in it or explicitly call next multiple times. The entire csv never gets read into memory. Nice.

每次我从迭代器中读取下一个块时,我都会对其进行一些昂贵的计算.

Every time I read a next chunk from the iterator, I also perform some expensive computation on it.

但是现在我有2个这样的文件.我想创建2个这样的生成器,并在一个内核上创建1个,在另一个内核上创建1个,这样我就可以:

But now I have 2 such files. I would like to create 2 such generators, and 'emplace' 1 on one core and 1 on another, such that I can:

 result = expensive_process(next(iterator))

在每个内核上并行执行,然后合并并返回结果.重复此步骤,直到一个或两个发电机都无法使用.

on each core, in parallel, and then combine and return the result. Repeat this step until one generator or both is out of yield.

看起来TextFileReader不可腌制,也不是生成器.我找不到如何在快速或多处理中执行此操作的方法.有这种模式吗?

It looks like the TextFileReader is not pickleable, nor is a generator. I can't find out how to do this in dask or multiprocessing. Is there a pattern for this?

推荐答案

Dask的read_csv旨在以块为单位从多个文件中加载数据,并且可以指定块大小.当对结果数据帧进行操作时,将逐块进行操作,这恰恰是首先使用Dask的要点.不需要使用您的迭代器方法.

Dask's read_csv is designed to load data from multiple files in chunks, with a chunk-size that you can specify. When you operate on the resultant dataframe, you will be working chunk-wise, which is exactly the point of using Dask in the first place. There should be no need to use your iterator method.

您最想使用的dask dataframe方法很可能是 map_partitions().

The dask dataframe method you will want to use, most likely, is map_partitions().

如果您真的想使用迭代器的想法,则应该通过发送每次调用的 dask.delayed 来实现并行化任意python函数的功能.功能(每个文件都有不同的文件名).

If you really wanted to use the iterator idea, you should look into dask.delayed, which is able to parallelise arbitrary python functions, by sending each invocation of the function (with a different file-name for each) to your workers.

这篇关于python或dask并行生成器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆