在计算图表期间无法进行快速广播 [英] Dask broadcast not available during compute graph

查看:63
本文介绍了在计算图表期间无法进行快速广播的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Dask,并希望将查找pandas.DataFrame运送到所有辅助节点.不幸的是,它失败了:

I am experimenting with Dask and want to ship a lookup pandas.DataFrame to all worker nodes. Unfortunately, it fails with:

TypeError: ("'Future' object is not subscriptable", 'occurred at index 0')

当使用lookup.result()['foo'].iloc[2]代替lookup['baz'].iloc[2]时,它可以正常工作,但是:对于输入数据帧的较大实例,它似乎一次又一次地卡在from_pandas上.另外,似乎很奇怪,将来需要手动阻止(在apply操作中的每一行中都一遍又一遍.是否有一种方法可以对每个工作节点仅一次阻止将来?天真的改进可能是使用,但这仅在分区数量相当少的情况下才可行.

When instead of lookup['baz'].iloc[2] using lookup.result()['foo'].iloc[2], it works fine but: for larger instances of the input dataframe, it seems to be stuck at from_pandas again and again. Also, it seems strange that the future needs to be blocked manually (over and over again for each row in the apply operation. Is there a way to block for the future only once per worker node? A naive improvement could be to use map_partitions, but this would only be feasible if the number of partitions is fairly small.

import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

client = Client()

df_first = pd.DataFrame({'foo':[1,2,3]})
df_second = pd.DataFrame({'bar':[1,2,3], 'baz':[1,2,3]})

df_first_scattered = client.scatter(df_first, broadcast=True)
df_second_dask = dd.from_pandas(df_second, npartitions=2)


def foo(row, lookup):
    # TODO some computation which relies on the lookup
    return lookup['foo'].iloc[2]

df_second_dask['foo'] = df_second_dask.apply(lambda x: foo(x, df_first_scattered), axis = 1, meta=('baz', 'int64'))
df_second_dask = df_second_dask.compute()
df_second_dask.head()

实际上,对于较大的问题实例,这种幼稚的实现似乎比普通的熊猫要慢.我怀疑执行速度慢与上述问题有关.

In fact, this naive dask implementation seems to be slower than plain pandas for larger problem instances. I suspect the slow execution performance is related to the issue raised above.

推荐答案

代替此:

df_second_dask['foo'] = df_second_dask.apply(lambda x: foo(x, df_first_scattered), axis = 1, meta=('baz', 'int64'))

尝试以下方法:

df_second_dask['foo'] = df_second_dask.apply(foo, args=[df_first_scattered], axis = 1, meta=('baz', 'int64'))

以前,您将未来隐藏在lambda函数中. Dask无法找到它以将其转换为适当的值.相反,当我们将未来作为正确的论据传递时,Dask能够识别出未来是什么,并正确地为您提供价值.

Previously you were hiding the future inside of a lambda function. Dask wasn't able to find it in order to turn it into the proper value. Instead, when we pass the future as a proper argument Dask is able to identify it for what it is and give you the value properly.

这篇关于在计算图表期间无法进行快速广播的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆