DASK计算过去n天的分组滚动平均值,并分配给原始数据帧 [英] Dask calculate groupby rolling mean over the last n days and assign to original dataframe

查看:14
本文介绍了DASK计算过去n天的分组滚动平均值,并分配给原始数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正试图通过在DASK中滚动均值逻辑来复制下面的 pandas 群体。但停留在1)如何指定时间段(以天为单位)和2)如何将其分配回原始帧?

df['avg3d']=df.groupby('g')['v'].transform(lambda x: x.rolling('3D').mean())

获得如下错误:

ValueError: index must be monotonicValueError: Not all divisions are known, can't align partitionsValueError: cannot reindex from a duplicate axis

完整示例

import pandas as pd
import dask.dataframe

df1 = pd.DataFrame({'g':['a']*10,'v':range(10)},index=pd.date_range('2020-01-01',periods=10))
df2=df1.copy()
df2['g']='b'
df = pd.concat([df1,df2]).sort_index()
df['avg3d']=df.groupby('g')['v'].transform(lambda x: x.rolling('3D').mean())

ddf = dask.dataframe.from_pandas(df, npartitions=4)
# works
ddf.groupby('g')['v'].apply(lambda x: x.rolling(3).mean(), meta=('avg3d', 'f8')).compute()

# rolling time period fails
ddf.groupby('g')['v'].apply(lambda x: x.rolling('3D').mean(), meta=('avg3d', 'f8')).compute()

# how do I add it to the rest of the data??
# neither of these work
ddf['avg3d']=ddf.groupby('g')['v'].apply(lambda x: x.rolling('3D').mean(), meta=('x', 'f8'))
ddf['avg3d']=ddf.groupby('g')['v'].transform(lambda x: x.rolling(3).mean(), meta=('x', 'f8'))
ddft = ddf.merge(ddf3d)
ddf.assign(avg3d=ddf.groupby('g')['v'].transform(lambda x: x.rolling(3).mean(), meta=('x', 'f8')))

已查看

dask groupby apply then merge back to dataframe
Dask rolling function by group syntax
Compute the rolling mean over the last n days in Dask
ValueError: Not all divisions are known, can't align partitions error on dask dataframe

推荐答案

此问题源于.groupby在dASK中的当前实现。下面的答案不是完整的解决方案,但有望解释错误发生的原因。

首先,让我们确保获得true_result,我们可以将DASK结果与其进行比较:

import dask.dataframe
import pandas as pd

df1 = pd.DataFrame(
    {"g": ["a"] * 10, "v": range(10)}, index=pd.date_range("2020-01-01", periods=10)
)
df = pd.concat([df1, df1.assign(g="b")]).sort_index()

df["avg3d"] = df.groupby("g")["v"].transform(lambda x: x.rolling("3D").mean())
true_result = df["avg3d"].array

现在,运行用#works注释的代码将每次生成不同的值,即使数据或计算没有随机性来源:

ddf = dask.dataframe.from_pandas(df, npartitions=4)
# this doesn't work
dask_result_1 = ddf.groupby("g")["v"].apply(
    lambda x: x.rolling(3).mean(), meta=("avg3d", "f8")
).compute().array

# this will fail, every time for a different reason
assert all(dask_result_1 == true_result)

为什么会发生这种情况?好的,在幕后,Dask会想要打乱数据,以确保groupby变量的所有值都在单个分区中。这种混洗似乎是随机的,因此当这些值被缝合在一起时,它们可能会打乱原始顺序。

因此,解决此问题的一个快捷方法是在滚动计算之前添加排序:

# rolling time period works
avg3d_dask = (
    ddf.groupby("g")["v"]
    .apply(lambda x: x.sort_index().rolling("3D").mean(), meta=("avg3d", "f8"))
    .compute()
    .droplevel(0)
    .sort_index()
)

# this will always pass
assert all(avg3d_dask == true_result)
现在,我们如何将其添加到原始数据名中?我不知道有什么简单的方法可以做到这一点,但最困难的方法之一是计算原始DaskDataFrame的分区,然后将数据分割成适当的块并进行分配。然而,这种方法不是很健壮(或者至少需要大量特定于用例的微调),因此希望有人能为这一部分提供更好的解决方案。

这篇关于DASK计算过去n天的分组滚动平均值,并分配给原始数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆