覆盖dask调度程序以同时在多个worker上加载数据 [英] Override dask scheduler to concurrently load data on multiple workers

查看:81
本文介绍了覆盖dask调度程序以同时在多个worker上加载数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在我的分布式集群上运行图形/未来,它们都具有一个加载数据"根任务,然后要在这些数据上运行一堆训练任务.简化的版本如下所示:

I want to run graphs/futures on my distributed cluster which all have a 'load data' root task and then a bunch of training tasks that run on that data. A simplified version would look like this:

from dask.distributed import Client
client = Client(scheduler_ip)
load_data_future = client.submit(load_data_func, 'path/to/data/')
train_task_futures = [client.submit(train_func, load_data_future, params) 
                      for params in train_param_set]

像上面那样在调度程序上运行它,将使一个工作程序读取文件,然后将数据溢出到磁盘上以与其他工作程序共享.但是,加载数据通常是从一个大的HDF5文件读取的,可以同时完成,所以我想知道是否有一种方法可以强制所有工作程序同时读取该文件(他们都计算根任务)而不是让他们工作等待一个工作人员完成,然后慢慢从该工作人员传输数据.

Running this as above the scheduler gets one worker to read the file, then it spills that data to disk to share it with the other workers. However, loading the data is usually reading from a large HDF5 file, which can be done concurrently, so I was wondering if there was a way to force all workers to read this file concurrently (they all compute the root task) instead of having them wait for one worker to finish then slowly transferring the data from that worker.

我知道有一种client.run()方法,我可以使用该方法让所有工作程序同时读取文件,但是您如何才能将读取的数据馈送到下游任务中呢?

I know there is the client.run() method which I can use to get all workers to read the file concurrently, but how would you then get the data you've read to feed into the downstream tasks?

我不能使用dask数据原语来同时读取HDF5文件,因为我需要诸如多索引和在多列上进行分组之类的东西.

I cannot use the dask data primitives to concurrently read HDF5 files because I need things like multi-indexes and grouping on multiple columns.

推荐答案

重新使用此问题,发现了一个相对简单的解决方案,尽管它使用内部API方法并涉及对client.run()的阻塞调用.使用与问题中相同的变量:

Revisited this question and found a relatively simple solution, though it uses internal API methods and involves a blocking call to client.run(). Using the same variables as in the question:

from distributed import get_worker
client_id = client.id
def load_dataset():
    worker = get_worker()
    data = {'load_dataset-0': load_data_func('path/to/data')}
    info = worker.update_data(data=data, report=False)
    worker.scheduler.update_data(who_has={key: [worker.address] for key in data}, 
                                 nbytes=info['nbytes'], client=client_id)
client.run(load_dataset)

现在,如果您运行client.has_what(),您应该看到每个工作人员都按住键load_dataset-0.要在下游计算中使用它,您只需为密钥创建一个Future:

Now if you run client.has_what() you should see that each worker holds the key load_dataset-0. To use this in downstream computations you can simply create a future for the key:

from distributed import Future
load_data_future = Future('load_dataset-0', client=client)

,它可以照常与client.compute()dask.delayed一起使用.确实,问题示例的最后一行可以正常工作:

and this can be used with client.compute() or dask.delayed as usual. Indeed the final line from the example in the question would work fine:

train_task_futures = [client.submit(train_func, load_data_future, params) 
                      for params in train_param_set]

请记住,它使用内部API方法Worker.update_dataScheduler.update_data,并且从distributed.__version__ == 1.21.6开始可以正常使用,但在将来的发行版中可能会发生更改.

Bear in mind that it uses internal API methods Worker.update_data and Scheduler.update_data and works fine as of distributed.__version__ == 1.21.6 but could be subject to change in future releases.

这篇关于覆盖dask调度程序以同时在多个worker上加载数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆