自定义图形中的Da机会式缓存 [英] Dask opportunistic caching in custom graphs

查看:86
本文介绍了自定义图形中的Da机会式缓存的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个自定义DAG,例如:

I have a custom DAG such as:

dag = {'load': (load, 'myfile.txt'),
       'heavy_comp': (heavy_comp, 'load'),
       'simple_comp_1': (sc_1, 'heavy_comp'),
       'simple_comp_2': (sc_2, 'heavy_comp'),
       'simple_comp_3': (sc_3, 'heavy_comp')}

我正在寻找计算键 simple_comp_1 simple_comp_2 simple_comp_3 的键,

And I'm looking to compute the keys simple_comp_1, simple_comp_2, and simple_comp_3, which I perform as follows,

import dask
from dask.distributed import Client
from dask_yarn import YarnCluster

task_1 = dask.get(dag, 'simple_comp_1')
task_2 = dask.get(dag, 'simple_comp_2')
task_3 = dask.get(dag, 'simple_comp_3')
tasks = [task_1, task_2, task_3]

cluster = YarnCluster()
cluster.scale(3)
client = Client(cluster)
dask.compute(tasks)
cluster.shutdown()

似乎没有缓存,计算o如果这3个键将导致 heavy_comp 的计算也进行3次。并且由于这是一项繁重的计算,因此我尝试从此处实施机会缓存。如下所示:

It seems, that without caching, the computation of these 3 keys will lead to the computation of heavy_comp also 3 times. And since this is a heavy computation, I tried to implement opportunistic caching from here as follows:

from dask.cache import Cache
cache = Cache(2e9)
cache.register()

但是,当我尝试打印正在缓存的结果时,我什么也没得到: / p>

However, when I tried to print the results of what was being cached I got nothing:

>>> cache.cache.data
[]
>>> cache.cache.heap.heap
{}
>>> cache.cache.nbytes
{}

我什至尝试将缓存大小增加到6GB,但是没有效果。难道我做错了什么?如何让Dask缓存键 heavy_comp 的结果?

I even tried increasing the cache size to 6GB, however to no effect. Am I doing something wrong? How can I get Dask to cache the result of the key heavy_comp?

推荐答案

扩展MRocklin的答案,并在问题下方的注释中格式化代码。

Expanding on MRocklin's answer and to format code in the comments below the question.

立即计算整个图形即可。 heavy_comp 只执行一次,这就是您想要的。请考虑以下在空函数定义完成的注释中提供的代码:

Computing the entire graph at once works as you would expect it to. heavy_comp would only be executed once, which is what you want. Consider the following code you provided in the comments completed by empty function definitions:

def load(fn):
    print('load')
    return fn

def sc_1(i):
    print('sc_1')
    return i

def sc_2(i):
    print('sc_2')
    return i

def sc_3(i):
    print('sc_3')
    return i

def heavy_comp(i):
    print('heavy_comp')
    return i

def merge(*args):
    print('merge')
    return args

dag = {'load': (load, 'myfile.txt'), 'heavy_comp': (heavy_comp, 'load'), 'simple_comp_1': (sc_1, 'heavy_comp'), 'simple_comp_2': (sc_2, 'heavy_comp'), 'simple_comp_3': (sc_3, 'heavy_comp'), 'merger_comp': (merge, 'sc_1', 'sc_2', 'sc_3')}

import dask
result = dask.get(dag, 'merger_comp')
print('result:', result)

它输出:


load
heavy_comp
sc_1
sc_2
sc_3
merge
result: ('sc_1', 'sc_2', 'sc_3')


如您所见, heavy_comp仅打印一次,表明函数 heavy_comp 仅具有被执行一次。

As you can see, "heavy_comp" is only printed once, showing that the function heavy_comp has only been executed once.

这篇关于自定义图形中的Da机会式缓存的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆