如何在Dask分布式环境中有效地提交带有大参数的任务? [英] How to efficiently submit tasks with large arguments in Dask distributed?

查看:76
本文介绍了如何在Dask分布式环境中有效地提交带有大参数的任务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想用Dask提交具有大(千兆字节)参数的函数。做这个的最好方式是什么?我想用不同(小的)参数多次运行此函数。

I want to submit functions with Dask that have large (gigabyte scale) arguments. What is the best way to do this? I want to run this function many times with different (small) parameters.

此使用parallel.futures接口。我们可以轻松地使用dask.delayed接口。

This uses the concurrent.futures interface. We could use the dask.delayed interface just as easily.

x = np.random.random(size=100000000)  # 800MB array
params = list(range(100))             # 100 small parameters

def f(x, param):
    pass

from dask.distributed import Client
c = Client()

futures = [c.submit(f, x, param) for param in params]

但这比我预期的要慢,否则会导致内存错误。

But this is slower than I would expect or results in memory errors.

推荐答案

好,所以这里的错误是每个任务都包含numpy数组 x ,该数组很大。对于我们提交的100个任务中的每一个,我们都需要序列化 x ,将其发送到调度程序,再发送给工作程序,等等。

OK, so what's wrong here is that each task contains the numpy array x, which is large. For each of the 100 tasks that we submit we need to serialize x, send it up to the scheduler, send it over to the worker, etc..

相反,我们将数组发送到集群一次:

Instead, we'll send the array up to the cluster once:

[future] = c.scatter([x])

现在未来是一个令牌,指向驻留在群集中的数组 x 。现在,我们可以提交引用此远程将来的任务,而不是本地客户端上的numpy数组。

Now future is a token that points to an array x that lives on the cluster. Now we can submit tasks that refer to this remote future, instead of the numpy array on our local client.

# futures = [c.submit(f, x, param) for param in params]  # sends x each time
futures = [c.submit(f, future, param) for param in params]  # refers to remote x already on cluster

现在这要快得多,并且可以使Dask更有效地控制数据移动。

This is now much faster, and lets Dask control data movement more effectively.

如果您希望最终将数组x移动到所有工作人员,则可能需要广播数组开始

If you expect to need to move the array x to all workers eventually then you may want to broadcast the array to start

[future] = c.scatter([x], broadcast=True)



使用延迟的延迟



将来,dask.delayed也可以正常工作。这里没有性能优势,但有些人更喜欢此界面:

Use Dask Delayed

Futures work fine with dask.delayed as well. There is no performance benefit here, but some people prefer this interface:

# futures = [c.submit(f, future, param) for param in params]

from dask import delayed
lazy_values = [delayed(f)(future, param) for param in params]
futures = c.compute(lazy_values)

这篇关于如何在Dask分布式环境中有效地提交带有大参数的任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆