如何在Dask分布式环境中有效地提交带有大参数的任务? [英] How to efficiently submit tasks with large arguments in Dask distributed?
问题描述
我想用Dask提交具有大(千兆字节)参数的函数。做这个的最好方式是什么?我想用不同(小的)参数多次运行此函数。
I want to submit functions with Dask that have large (gigabyte scale) arguments. What is the best way to do this? I want to run this function many times with different (small) parameters.
此使用parallel.futures接口。我们可以轻松地使用dask.delayed接口。
This uses the concurrent.futures interface. We could use the dask.delayed interface just as easily.
x = np.random.random(size=100000000) # 800MB array
params = list(range(100)) # 100 small parameters
def f(x, param):
pass
from dask.distributed import Client
c = Client()
futures = [c.submit(f, x, param) for param in params]
但这比我预期的要慢,否则会导致内存错误。
But this is slower than I would expect or results in memory errors.
推荐答案
好,所以这里的错误是每个任务都包含numpy数组 x
,该数组很大。对于我们提交的100个任务中的每一个,我们都需要序列化 x
,将其发送到调度程序,再发送给工作程序,等等。
OK, so what's wrong here is that each task contains the numpy array x
, which is large. For each of the 100 tasks that we submit we need to serialize x
, send it up to the scheduler, send it over to the worker, etc..
相反,我们将数组发送到集群一次:
Instead, we'll send the array up to the cluster once:
[future] = c.scatter([x])
现在未来
是一个令牌,指向驻留在群集中的数组 x
。现在,我们可以提交引用此远程将来的任务,而不是本地客户端上的numpy数组。
Now future
is a token that points to an array x
that lives on the cluster. Now we can submit tasks that refer to this remote future, instead of the numpy array on our local client.
# futures = [c.submit(f, x, param) for param in params] # sends x each time
futures = [c.submit(f, future, param) for param in params] # refers to remote x already on cluster
现在这要快得多,并且可以使Dask更有效地控制数据移动。
This is now much faster, and lets Dask control data movement more effectively.
如果您希望最终将数组x移动到所有工作人员,则可能需要广播数组开始
If you expect to need to move the array x to all workers eventually then you may want to broadcast the array to start
[future] = c.scatter([x], broadcast=True)
使用延迟的延迟
将来,dask.delayed也可以正常工作。这里没有性能优势,但有些人更喜欢此界面:
Use Dask Delayed
Futures work fine with dask.delayed as well. There is no performance benefit here, but some people prefer this interface:
# futures = [c.submit(f, future, param) for param in params]
from dask import delayed
lazy_values = [delayed(f)(future, param) for param in params]
futures = c.compute(lazy_values)
这篇关于如何在Dask分布式环境中有效地提交带有大参数的任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!