在窗口上聚合(总和)以获取列列表 [英] Aggregate (Sum) over Window for a list of Columns
问题描述
对于 DataFrame 中可用的列列表,我无法找到一种通用方法来计算给定窗口上的 Sum(或任何聚合函数).
I'm having trouble finding a generic way to calculate the Sum (or any aggregate function) over a given window, for a list of columns available in the DataFrame.
val inputDF = spark
.sparkContext
.parallelize(
Seq(
(1,2,1, 30, 100),
(1,2,2, 30, 100),
(1,2,3, 30, 100),
(11,21,1, 30, 100),
(11,21,2, 30, 100),
(11,21,3, 30, 100)
),
10)
.toDF("c1", "c2", "offset", "v1", "v2")
input.show
+---+---+------+---+---+
| c1| c2|offset| v1| v2|
+---+---+------+---+---+
| 1| 2| 1| 30|100|
| 1| 2| 2| 30|100|
| 1| 2| 3| 30|100|
| 11| 21| 1| 30|100|
| 11| 21| 2| 30|100|
| 11| 21| 3| 30|100|
+---+---+------+---+---+
给定一个如上所示的 DataFrame,很容易找到列列表的 Sum,类似于下面显示的代码片段 -
Given a DataFrame as shown above, it's easy to find Sum for a list of columns, similar to code snippet shown below -
val groupKey = List("c1", "c2").map(x => col(x.trim))
val orderByKey = List("offset").map(x => col(x.trim))
val aggKey = List("v1", "v2").map(c => sum(c).alias(c.trim))
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*)
val outputDF = inputDF
.groupBy(groupKey: _*)
.agg(aggKey.head, aggKey.tail: _*)
outputDF.show
但我似乎无法通过窗口规范找到类似的聚合函数方法.到目前为止,我只能通过单独指定每一列来解决这个问题,如下所示 -
But I can't seem to find a similar approach for aggregate functions over a window spec. So far I've only been able to solve this by specifying each column individually as shown below -
val outputDF2 = inputDF
.withColumn("cumulative_v1", sum(when($"offset".between(-1, 1), inputDF("v1")).otherwise(0)).over(w))
.withColumn("cumulative_v3", sum(when($"offset".between(-2, 2), inputDF("v1")).otherwise(0)).over(w))
如果有办法对动态列列表进行聚合,我将不胜感激.谢谢!
I'd appreciate if there is a way to do this aggregation over a dynamic list of columns. Thanks!
推荐答案
我想我找到了一种比上述问题中提到的方法更有效的方法.
I think I found an approach that works better than the one stated in the above problem.
/**
* Utility method takes a DataFrame and a List of columns to return aggregated values for the specified list of columns
* @param colsToAggregate Seq[String] of all columns in the input DataFrame to be aggregated
* @param inputDF Input DataFrame
* @param f aggregate function 'call by name'
* @param partitionByColSeq Seq[] of column names to partition the inputDF before applying the aggregate
* @param orderByColSeq Seq[] of column names to order the inputDF before applying the aggregate
* @param name_prefix String to prefix the new columns with, to avoid collisions
* @param name New column names. Uses Identify function and reuses aggregated column names
* @return output DataFrame
*/
def withRollingAggregateColumns(colsToAggregate: Seq[String],
inputDF: DataFrame,
f: String => Column,
partitionByColSeq: Seq[String],
orderByColSeq: Seq[String],
name_prefix: String,
name: String => String = identity) = {
val groupByKey = partitionByColSeq.map(x => col(x.trim))
val orderByKey = orderByColSeq.map(x => col(x.trim))
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy(groupByKey: _*).orderBy(orderByKey: _*)
colsToAggregate
.foldLeft(inputDF)(
(df, elementInCols) => df
.withColumn(
name_prefix + "_" + name(elementInCols),
f(elementInCols).over(w)
)
)
}
在这种情况下,Utility 方法将 DataFrame 作为输入,并根据提供的函数 f 附加新列.它使用withColumn"和foldLeft"语法来迭代需要聚合的列列表.为避免任何列名冲突,它将用户提供的前缀"附加到新的聚合列
In this case, the Utility method takes a DataFrame as an input and appends new columns based on the provided function f. It uses the "withColumn" and "foldLeft" syntax to iterate over the list of columns which need to be aggregated. To avoid any column name collisions, it appends a user-provided 'prefix' to the new aggregate columns
这篇关于在窗口上聚合(总和)以获取列列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!