如何将函数并行应用于Dask数据帧的多个列? [英] How to apply a function to multiple columns of a Dask Data Frame in parallel?

查看:47
本文介绍了如何将函数并行应用于Dask数据帧的多个列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Dask Dataframe,我想为其计算列列表的偏斜度,如果该偏斜度超出某个阈值,我将使用对数转换对其进行校正.我想知道是否有更有效的方法,通过删除 correct_skewness()中的 for循环,使 correct_skewness()函数在多列上并行工作下面的功能:

I have a Dask Dataframe for which I would like to compute skewness for a list of columns and if this skewness exceeds a certain threshold, I correct it using log transformation. I am wondering whether there is a more efficient way of making correct_skewness() function work on multiple columns in parallel by removing the for loop in the correct_skewness() function below:

import dask
import dask.array as da 
from scipy import stats

# Create a dataframe 
df = dask.datasets.timeseries()

df.head()

                      id     name         x         y
timestamp
2000-01-01 00:00:00  1032   Oliver  0.018604  0.089191
2000-01-01 00:00:01  1032  Norbert  0.666689 -0.979374
2000-01-01 00:00:02   991   Victor  0.027691 -0.474660
2000-01-01 00:00:03   979    Kevin  0.320067  0.656949
2000-01-01 00:00:04  1087    Zelda -0.462076  0.513409


def correct_skewness(columns=None, max_skewness=2):
    if columns is None:
        raise ValueError(
            f"columns argument is None. Please set columns argument to a list of columns"
        )


    for col in columns:
        skewness = stats.skew(df[col])
        max_val = df[col].max().compute()
        min_val = df[col].min().compute()

        if abs(skewness) > max_skewness and (max_val > 1 or min_val < 0):
            delta = 1.0
            if min_val < 0:
                delta = max(1, -min_val + 1)
            df[col] = da.log(delta + df[col])
    return df

df = correct_skewness(columns=['x', 'y']) 

推荐答案

在此示例中,您可以做一些事情来改善并行度:

There are a couple things you can do to improve parallelism in this example:

您可以使用dask.array.stats.skew而不是statsmodels.skew.您将必须显式导入dask.array.stats

You can use dask.array.stats.skew rather than statsmodels.skew. You will have to import dask.array.stats explicitly

您可以一次计算所有列的最小值/最大值

You can compute the min/max of all columns in one computation

    mins = [df[col].min() for col in cols]
    maxes = [df[col].min() for col in cols]
    skews = [da.stats.skew(df[col]) for col in cols]

    mins, maxes, skews = dask.compute(mins, maxes, skews)

然后,您可以执行if-logic并适当地应用 da.log .仍然需要对数据进行两次传递,但这应该是对您现有数据的一个很好的改进.

Then you could do your if-logic and apply da.log as appropriate. This still requires two passes over your data, but that should be a nice improvement over what you have now.

这篇关于如何将函数并行应用于Dask数据帧的多个列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆