如何并行运行生成器代码? [英] How to run generator code in parallel?

查看:108
本文介绍了如何并行运行生成器代码?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有这样的代码:

def generator():
    while True:
        # do slow calculation
        yield x

我想将慢速计算移到单独的过程中.

I would like to move the slow calculation to separate process(es).

我正在python 3.6中工作,所以我有concurrent.futures.ProcessPoolExecutor.只是不很清楚如何使用它并发生成器.

I'm working in python 3.6 so I have concurrent.futures.ProcessPoolExecutor. It's just not obvious how to concurrent-ize a generator using that.

与使用map的常规并发方案的不同之处在于,这里没有要映射的内容(生成器将永远运行),并且我们不希望一次获得所有结果,而是希望将它们排成队列并等待直到计算更多结果之前,队列未满.

The differences from a regular concurrent scenario using map is that there is nothing to map here (the generator runs forever), and we don't want all the results at once, we want to queue them up and wait until the queue is not full before calculating more results.

我不必使用concurrentmultiprocessing也可以.这是一个类似的问题,如何在生成器中使用它并不明显.

I don't have to use concurrent, multiprocessing is fine also. It's a similar problem, it's not obvious how to use that inside a generator.

轻微扭曲:生成器返回的每个值都是一个大的numpy数组(大约10兆字节).我如何在不酸洗和不酸洗的情况下进行转移?我已经看过multiprocessing.Array的文档,但是如何使用它传输numpy数组还不是很明显.

Slight twist: each value returned by the generator is a large numpy array (10 megabytes or so). How do I transfer that without pickling and unpickling? I've seen the docs for multiprocessing.Array but it's not totally obvious how to transfer a numpy array using that.

推荐答案

在这种情况下,我通常使用

In this type of situation I usually use the joblib library. It is a parallel computation framework based on multiprocessing. It supports memmapping precisely for the cases where you have to handle large numpy arrays. I believe it is worth checking for you.

在这一点上,也许joblib的文档不够明确,仅显示带有for循环的示例,因为您要使用生成器,我应该指出,它确实适用于生成器.下面的示例将实现您的目标:

Maybe joblib's documentation is not explicit enough on this point, showing only examples with for loops, since you want to use a generator I should point out that it indeed works with generators. An example that would achieve what you want is the following:

from joblib import Parallel, delayed
def my_long_running_job(x):
    # do something with x
# you can customize the number of jobs
Parallel(n_jobs=4)(delayed(my_long_running_job)(x) for x in generator())

我不知道您要执行哪种处理,但是如果它释放了GIL,您还可以考虑使用线程.这样,您就不必在进程之间转移大型numpy数组,而仍然受益于真正的并行性.

I don't know what kind of processing you want to do, but if it releases the GIL you could also consider using threads. This way you won't have the problem of having to transfer large numpy arrays between processes, and still beneficiate from true parallelism.

这篇关于如何并行运行生成器代码?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆