使用 dask 并行化树的创建 [英] Parallelize tree creation with dask

查看:75
本文介绍了使用 dask 并行化树的创建的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

关于一个我很确定 dask 可以解决的问题,我需要帮助.但我不知道如何解决它.

I need help about a problem that I'm pretty sure dask can solve. But I don't know how to tackle it.

我需要递归地构造一棵树.

I need to construct a tree recursively.

对于每个节点,如果满足一个标准,则执行计算 (compute_val),否则会创建 2 个新子节点.对子项执行相同的处理 (build).然后,如果节点的所有子节点都执行了计算,我们就可以进行合并(merge).合并可以执行子项的融合(如果它们都符合标准)或什么都不执行.目前我只能并行化第一级,我不知道应该使用哪些 dask 工具才能更有效.这是我想要实现的简化 MRE 顺序:

For each node if a criterion is met a computation (compute_val) is done else 2 new childs are created. The same treament is performed on the childs (build). Then if all the childs of node had performed a computation we can proceed to a merge (merge). The merge can perform a fusion of the childs (if they both meet a criterion) or nothing. For the moment I was able to parallelize only the first level and I don't know which tools of dask I should use to be more effective. This is a simplified MRE sequential of what I want to achieve:

import numpy as np
import time

class Node:
    def __init__(self, level):
        self.level = level
        self.val = None

def merge(node, childs):
    values = [child.val for child in childs]
    if all(values) and sum(values)<0.1:
        node.val = np.mean(values)
    else:
        node.childs = childs
    return node        

def compute_val():
    time.sleep(0.1)
    return np.random.rand(1)

def build(node):
    print(node.level)
    if (np.random.rand(1) < 0.1 and node.level>1) or node.level>5:
        node.val = compute_val()
    else:
        childs = [build(Node(level=node.level+1)) for _ in range(2)]
        node = merge(node, childs)
    return node

tree = build(Node(level=0))

推荐答案

据我所知,处理递归(或任何动态计算)的方法是在任务中创建任务.

As I understand, the way you tackle recursion (or any dynamic computation) is to create tasks within a task.

我正在尝试类似的东西,所以下面是我的 5 分钟说明性解决方案.您必须根据算法的特点对其进行优化.

I was experimenting with something similar, so below is my 5 minute illustrative solution. You'd have to optimise it according to characteristics of the algorithm.

请记住,任务会增加开销,因此您需要对计算进行分块以获得最佳结果.

Keep in mind that tasks add overhead, so you'd want to chunk the computations for optimal results.

相关文档:

API 参考:

import numpy as np
import time
from dask.distributed import Client, worker_client

# Create a dask client
# For convenience, I'm creating a localcluster.
client = Client(threads_per_worker=1, n_workers=8)
client

class Node:
    def __init__(self, level):
        self.level = level
        self.val = None
        self.childs = None   # This was missing

def merge(node, childs):
    values = [child.val for child in childs]
    if all(values) and sum(values)<0.1:
        node.val = np.mean(values)
    else:
        node.childs = childs
    return node        

def compute_val():
    time.sleep(0.1)            # Is this required.
    return np.random.rand(1)

def build(node):
    print(node.level)
    if (np.random.rand(1) < 0.1 and node.level>1) or node.level>5:
        node.val = compute_val()
    else:
        with worker_client() as client:
            child_futures = [client.submit(build, Node(level=node.level+1)) for _ in range(2)]
            childs = client.gather(child_futures)
        node = merge(node, childs)
    return node

tree_future = client.submit(build, Node(level=0))
tree = tree_future.result()

这篇关于使用 dask 并行化树的创建的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆