在PySpark Dataframe中合并透视和聚合列 [英] Combine pivoted and aggregated column in PySpark Dataframe
问题描述
我的问题与这一个有关.我有一个名为 df
的PySpark DataFrame,如下所示.
My question is related to this one. I have a PySpark DataFrame, named df
, as shown below.
date | recipe | percent | volume
----------------------------------------
2019-01-01 | A | 0.03 | 53
2019-01-01 | A | 0.02 | 55
2019-01-01 | B | 0.05 | 60
2019-01-02 | A | 0.11 | 75
2019-01-02 | B | 0.06 | 64
2019-01-02 | B | 0.08 | 66
如果我将其旋转到 recipe
上并汇总 percent
和 volume
,我会得到连接 recipe
和聚合变量.我可以使用 alias
进行清理.例如:
If I pivot it on recipe
and aggregate both percent
and volume
, I get column names that concatenate recipe
and the aggregated variable. I can use alias
to clean things up. For example:
df.groupBy('date').pivot('recipe').agg(avg('percent').alias('percent'), avg('volume').alias('volume')).show()
date | A_percent | A_volume | B_percent | B_volume
--------------------------------------------------------
2019-01-01 | 0.025 | 54 | 0.05 | 60
2019-01-02 | 0.11 | 75 | 0.07 | 65
但是,如果我只聚合一个变量,例如 percent
,则列名不包括聚合变量:
However, if I aggregate just one variable, say percent
, the column names don't include the aggregated variable:
df.groupBy('date').pivot('recipe').agg(avg('percent').alias('percent')).show()
date | A | B
-------------------------
2019-01-01 | 0.025 | 0.05
2019-01-02 | 0.11 | 0.07
当 agg
函数中只有一个变量时,如何设置它以包含串联名称?
How can I set it to include the concatenated name when there is only one variable in the agg
function?
推荐答案
According to Spark's source code, it has a special branch for pivoting with single aggregation.
val singleAgg = aggregates.size == 1
def outputName(value: Expression, aggregate: Expression): String = {
val stringValue = value.name
if (singleAgg) {
stringValue <--- Here
}
else {
val suffix = {...}
stringValue + "_" + suffix
}
}
我不知道原因,但是剩下的唯一选择是列重命名.
I don't know the reason, but the single remaining option is column renaming.
这是重命名的简化版本:
Here is a simplified version for renaming:
def rename(identity: Set[String], suffix: String)(df: DataFrame): DataFrame = {
val fieldNames = df.schema.fields.map(filed => filed.name)
val renamed = fieldNames.map(fieldName => {
if (identity.contains(fieldName)) {
fieldName
} else {
fieldName + suffix
}} )
df.toDF(renamed:_*)
}
用法:
rename(Set("date"), "_percent")(pivoted).show()
+----------+---------+---------+
| date|A_percent|B_percent|
+----------+---------+---------+
|2019-01-01| 0.025| 0.05|
|2019-01-02| 0.11| 0.06|
+----------+---------+---------+
这篇关于在PySpark Dataframe中合并透视和聚合列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!