将图分布到跨群集节点 [英] Distributing graphs to across cluster nodes

查看:69
本文介绍了将图分布到跨群集节点的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Dask.delayed方面取得了不错的进展。作为一个小组,我们决定花更多的时间使用Dask处理图。

I'm making good progress with Dask.delayed. As a group, we've decided to put more time in working with graphs using Dask.

我对分布有疑问。我在集群上看到以下行为。我启动例如在8个节点中的每个节点上有8个工作线程,每个节点有4个线程,然后说/我client.compute 8个图来创建模拟数据以进行后续处理。我希望每个节点生成8个数据集。但是,似乎发生的是,这八个功能并非在不合理的情况下在前两个节点上运行。随后的计算在第一和第二节点上运行。因此,我看到缺乏伸缩性。随着时间的流逝,其他节点将从诊断工作程序页面上消失。

I have a question about distribution. I'm seeing the following behaviour on our cluster. I start up e.g. 8 workers on each of 8 nodes each with 4 threads, say/ I then client.compute 8 graphs to create the simulated data for subsequent processing. I want to have the 8 data sets generated one per node. However, what seems to happen is, not unreasonably, the eight functions are run on the first two nodes. Subsequent computations are run on the first and second nodes. Hence I see lack of scaling. Over time, the other nodes disappear from the diagnostics workers page. Is this expected?

所以我想先按节点分配数据创建功能。因此,当我想计算图时,现在我要做:

So I want to distribute the data creation functions by node first. So when I want to compute the graphs, now I do:

if nodes is not None:
    print("Computing graph_list on the following nodes: %s" % nodes)
    return client.compute(graph_list, sync=True, workers=nodes, **kwargs)
else:
    return client.compute(graph_list, sync=True, **kwargs)

这似乎设置正确:诊断进度条显示我的数据创建功能在内存中,但没有启动。如果省略了节点,则计算将按预期进行。此行为在群集和台式机上均会发生。

This seems to set up correctly: the diagnostics progress bar shows that my data creation functions are in memory but they do not start. If the nodes are omitted then the computation proceeds as expected. This behaviour occurs both on the cluster and on my desktop.

更多信息:查看调度程序日志,我确实看到通信失败。

More info: looking at the scheduler log, I do see communication failures.

more dask-ssh_2017-09-04_09\:52\:09/dask_scheduler_sand-6-70\:8786.log
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO -   Scheduler at:    tcp://10.143.6.70:8786
distributed.scheduler - INFO -       bokeh at:              0.0.0.0:8787
distributed.scheduler - INFO -        http at:              0.0.0.0:9786
distributed.scheduler - INFO - Local Directory:    /tmp/scheduler-ny4ev7qh
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Register tcp://10.143.6.73:36810
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.6.73:36810
distributed.scheduler - INFO - Register tcp://10.143.6.71:46656
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.6.71:46656
distributed.scheduler - INFO - Register tcp://10.143.7.66:42162
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.7.66:42162
distributed.scheduler - INFO - Register tcp://10.143.7.65:35114
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.7.65:35114
distributed.scheduler - INFO - Register tcp://10.143.6.70:43208
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.6.70:43208
distributed.scheduler - INFO - Register tcp://10.143.7.67:45228
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.7.67:45228
distributed.scheduler - INFO - Register tcp://10.143.6.72:36100
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.6.72:36100
distributed.scheduler - INFO - Register tcp://10.143.7.68:41915
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.7.68:41915
distributed.scheduler - INFO - Receive client connection: 5d1dab2a-914e-11e7-8bd1-180373ff6d8b
distributed.scheduler - INFO - Worker 'tcp://10.143.6.71:46656' failed from closed comm: Stream is clos
ed
distributed.scheduler - INFO - Remove worker tcp://10.143.6.71:46656
distributed.scheduler - INFO - Removed worker tcp://10.143.6.71:46656
distributed.scheduler - INFO - Worker 'tcp://10.143.6.73:36810' failed from closed comm: Stream is clos
ed
distributed.scheduler - INFO - Remove worker tcp://10.143.6.73:36810
distributed.scheduler - INFO - Removed worker tcp://10.143.6.73:36810
distributed.scheduler - INFO - Worker 'tcp://10.143.6.72:36100' failed from closed comm: Stream is clos
ed
distributed.scheduler - INFO - Remove worker tcp://10.143.6.72:36100
distributed.scheduler - INFO - Removed worker tcp://10.143.6.72:36100
distributed.scheduler - INFO - Worker 'tcp://10.143.7.67:45228' failed from closed comm: Stream is clos
ed
distributed.scheduler - INFO - Remove worker tcp://10.143.7.67:45228
distributed.scheduler - INFO - Removed worker tcp://10.143.7.67:45228
(arlenv) [hpccorn1@login-sand8 performance]$

谢谢,
Tim

Thanks, Tim

推荐答案

Dask如何选择将任务分配给工作人员很复杂,并考虑了许多问题,例如负载平衡,数据传输,资源约束等。如果没有具体而简单的说明,很难推断出最终结果例。

How Dask chooses to allocate tasks to workers is complex, and takes into account many issues like load balancing, data transfer, resource constraints, etc.. It can be hard to reason about where things will end up without a concrete and simple example.

您可以尝试的一件事是一次提交所有计算,这使调度程序可以做出更明智的决策,而不是一次只看到一个。

One thing you can try is to submit all of your computations at once, this lets the scheduler make slightly more intelligent decisions rather than seeing things one at a time.

因此,您可以尝试替换如下代码:

So, you might try replacing code like this:

futures = [client.compute(delayed_value) for delayed_value in L]
wait(futures)

像这样的代码

futures = client.compute(L)
wait(futures)

但老实说,我只给这30%的机会来解决您的问题。如果不深入研究问题,就很难知道发生了什么。如果您可以提供一个非常简单可复制的代码示例,那将是最好的。

But honestly I only give this a 30% chance of solving your problem. It's hard to know what is going on without diving much more deeply into your problem. If you can provide a very simple reproducible code example then that would be best.

这篇关于将图分布到跨群集节点的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆