在窗口上聚合(总和)以获取列列表 [英] Aggregate (Sum) over Window for a list of Columns

查看:26
本文介绍了在窗口上聚合(总和)以获取列列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于 DataFrame 中可用的列列表,我无法找到一种通用方法来计算给定窗口上的 Sum(或任何聚合函数).

I'm having trouble finding a generic way to calculate the Sum (or any aggregate function) over a given window, for a list of columns available in the DataFrame.

val inputDF = spark
.sparkContext
.parallelize(
    Seq(
        (1,2,1, 30, 100),
        (1,2,2, 30, 100), 
        (1,2,3, 30, 100),
        (11,21,1, 30, 100),
        (11,21,2, 30, 100), 
        (11,21,3, 30, 100)
    ),
    10)
.toDF("c1", "c2", "offset", "v1", "v2")

input.show
+---+---+------+---+---+
| c1| c2|offset| v1| v2|
+---+---+------+---+---+
|  1|  2|     1| 30|100|
|  1|  2|     2| 30|100|
|  1|  2|     3| 30|100|
| 11| 21|     1| 30|100|
| 11| 21|     2| 30|100|
| 11| 21|     3| 30|100|
+---+---+------+---+---+

给定一个如上所示的 DataFrame,很容易找到列列表的 Sum,类似于下面显示的代码片段 -

Given a DataFrame as shown above, it's easy to find Sum for a list of columns, similar to code snippet shown below -

val groupKey = List("c1", "c2").map(x => col(x.trim))
    val orderByKey = List("offset").map(x => col(x.trim))

    val aggKey = List("v1", "v2").map(c => sum(c).alias(c.trim))

    import org.apache.spark.sql.expressions.Window

    val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*)

    val outputDF = inputDF
    .groupBy(groupKey: _*)
    .agg(aggKey.head, aggKey.tail: _*)

    outputDF.show

但我似乎无法通过窗口规范找到类似的聚合函数方法.到目前为止,我只能通过单独指定每一列来解决这个问题,如下所示 -

But I can't seem to find a similar approach for aggregate functions over a window spec. So far I've only been able to solve this by specifying each column individually as shown below -

val outputDF2 = inputDF
    .withColumn("cumulative_v1", sum(when($"offset".between(-1, 1), inputDF("v1")).otherwise(0)).over(w))
    .withColumn("cumulative_v3", sum(when($"offset".between(-2, 2), inputDF("v1")).otherwise(0)).over(w))

如果有办法对动态列列表进行聚合,我将不胜感激.谢谢!

I'd appreciate if there is a way to do this aggregation over a dynamic list of columns. Thanks!

推荐答案

我想我找到了一种比上述问题中提到的方法更有效的方法.

I think I found an approach that works better than the one stated in the above problem.

/**
    * Utility method takes a DataFrame and a List of columns to return aggregated values for the specified list of columns
    * @param colsToAggregate    Seq[String] of all columns in the input DataFrame to be aggregated
    * @param inputDF            Input DataFrame
    * @param f                  aggregate function 'call by name'
    * @param partitionByColSeq  Seq[] of column names to partition the inputDF before applying the aggregate
    * @param orderByColSeq      Seq[] of column names to order the inputDF before applying the aggregate
    * @param name_prefix        String to prefix the new columns with, to avoid collisions
    * @param name               New column names. Uses Identify function and reuses aggregated column names
    * @return                   output DataFrame
    */
  def withRollingAggregateColumns(colsToAggregate: Seq[String],
                                  inputDF: DataFrame,
                                  f: String => Column,
                                  partitionByColSeq: Seq[String],
                                  orderByColSeq: Seq[String],
                                  name_prefix: String,
                                  name: String => String = identity) = {

    val groupByKey = partitionByColSeq.map(x => col(x.trim))
    val orderByKey = orderByColSeq.map(x => col(x.trim))

    import org.apache.spark.sql.expressions.Window

    val w = Window.partitionBy(groupByKey: _*).orderBy(orderByKey: _*)

    colsToAggregate
      .foldLeft(inputDF)(
        (df, elementInCols) => df
          .withColumn(
            name_prefix + "_" + name(elementInCols),
            f(elementInCols).over(w)
          )
      )
  }

在这种情况下,Utility 方法将 DataFrame 作为输入,并根据提供的函数 f 附加新列.它使用withColumn"和foldLeft"语法来迭代需要聚合的列列表.为避免任何列名冲突,它将用户提供的前缀"附加到新的聚合列

In this case, the Utility method takes a DataFrame as an input and appends new columns based on the provided function f. It uses the "withColumn" and "foldLeft" syntax to iterate over the list of columns which need to be aggregated. To avoid any column name collisions, it appends a user-provided 'prefix' to the new aggregate columns

这篇关于在窗口上聚合(总和)以获取列列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆