PySpark DataFrame 上分组数据的 Pandas 式转换 [英] Pandas-style transform of grouped data on PySpark DataFrame

查看:35
本文介绍了PySpark DataFrame 上分组数据的 Pandas 式转换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果我们有一个由一列类别和一列值组成的 Pandas 数据框,我们可以通过执行以下操作来删除每个类别中的均值:

df["DemeanedValues"] = df.groupby("Category")["Values"].transform(lambda g: g - numpy.mean(g))

据我所知,Spark 数据帧不直接提供这种分组/转换操作(我在 Spark 1.5.0 上使用 PySpark).那么,实现这种计算的最佳方法是什么?

我尝试使用 group-by/join 如下:

df2 = df.groupBy("Category").mean("Values")df3 = df2.join(df)

但它非常慢,因为据我所知,每个类别都需要对 DataFrame 进行全面扫描.

我认为(但尚未验证)如果我将分组/均值的结果收集到字典中,然后在 UDF 中使用该字典,我可以大大加快速度,如下所示:

nameToMean = {...}f = lambda 类别,值:值 - nameToMean[类别]categoryDe​​meaned = pyspark.sql.functions.udf(f, pyspark.sql.types.DoubleType())df = df.withColumn("DemeanedValue", categoryDe​​meaned(df.Category, df.Value))

有没有一种不牺牲性能的惯用方式来表达这种类型的操作?

解决方案

我明白,每个类别都需要对 DataFrame 进行全面扫描.

不,不是.DataFrame 聚合使用类似于 aggregateByKey 的逻辑执行.请参阅 DataFrame groupBy behavior/optimization 较慢的部分是 join,它需要排序/改组.但它仍然不需要每组扫描.

如果这是一个确切的代码,你使用它会很慢,因为你没有提供连接表达式.因此,它只是执行笛卡尔积.所以它不仅效率低下,而且不正确.你想要这样的东西:

from pyspark.sql.functions import colmean = df.groupBy("Category").mean("Values").alias("means")df.alias("df").join(means, col("df.Category") == col("means.Category"))

<块引用>

我认为(但尚未验证)如果我将分组/均值的结果收集到字典中,然后在 UDF 中使用该字典,我可以大大加快速度

虽然性能会因情况而异,但这是可能的.使用 Python UDF 的一个问题是它必须将数据移入和移出 Python.尽管如此,它绝对值得一试.不过,您应该考虑为 nameToMean 使用广播变量.

<块引用>

有没有一种不牺牲性能的惯用方式来表达这种类型的操作?

在 PySpark 1.6 中你可以使用 broadcast 功能:

df.alias("df").join(广播(手段),col(df.Category")== col(means.Category"))

但它在 <= 1.5 中不可用.

If we have a Pandas data frame consisting of a column of categories and a column of values, we can remove the mean in each category by doing the following:

df["DemeanedValues"] = df.groupby("Category")["Values"].transform(lambda g: g - numpy.mean(g))

As far as I understand, Spark dataframes do not directly offer this group-by/transform operation (I am using PySpark on Spark 1.5.0). So, what is the best way to implement this computation?

I have tried using a group-by/join as follows:

df2 = df.groupBy("Category").mean("Values")
df3 = df2.join(df)

But it is very slow since, as I understand, each category requires a full scan of the DataFrame.

I think (but have not verified) that I can speed this up a great deal if I collect the result of the group-by/mean into a dictionary, and then use that dictionary in a UDF as follows:

nameToMean = {...}
f = lambda category, value: value - nameToMean[category]
categoryDemeaned = pyspark.sql.functions.udf(f, pyspark.sql.types.DoubleType())
df = df.withColumn("DemeanedValue", categoryDemeaned(df.Category, df.Value))

Is there an idiomatic way to express this type of operation without sacrificing performance?

解决方案

I understand, each category requires a full scan of the DataFrame.

No it doesn't. DataFrame aggregations are performed using a logic similar to aggregateByKey. See DataFrame groupBy behaviour/optimization A slower part is join which requires sorting / shuffling. But it still doesn't require scan per group.

If this is an exact code you use it is slow because you don't provide a join expression. Because of that it simply performs a Cartesian product. So it is not only inefficient but also incorrect. You want something like this:

from pyspark.sql.functions import col

means = df.groupBy("Category").mean("Values").alias("means")
df.alias("df").join(means, col("df.Category") == col("means.Category"))

I think (but have not verified) that I can speed this up a great deal if I collect the result of the group-by/mean into a dictionary, and then use that dictionary in a UDF

It is possible although performance will vary on case by case basis. A problem with using Python UDFs is that it has to move data to and from Python. Still, it is definitely worth trying. You should consider using a broadcast variable for nameToMean though.

Is there an idiomatic way to express this type of operation without sacrificing performance?

In PySpark 1.6 you can use broadcast function:

df.alias("df").join(
    broadcast(means), col("df.Category") == col("means.Category"))

but it is not available in <= 1.5.

这篇关于PySpark DataFrame 上分组数据的 Pandas 式转换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆