Scala-Spark 使用参数值动态调用 groupby 和 agg [英] Scala-Spark Dynamically call groupby and agg with parameter values

查看:39
本文介绍了Scala-Spark 使用参数值动态调用 groupby 和 agg的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想编写一个自定义分组和聚合函数来获取用户指定的列名和用户指定的聚合映射.我不知道前面的列名和聚合映射.我想写一个类似于下面的函数.但我是 Scala 的新手,我无法解决它.

I want to write a custom grouping and aggregate function to get user specified column names and user specified aggregation map.I do not know the column names and aggregation map up front. I want to write a function similar to something like below. But i am new to Scala and i cannot solve it.

def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
  val grouped = df.groupBy(cols)
  val aggregated = grouped.agg(aggregateFun)
  aggregated.show()
}

并想像

val listOfStrings =  List("A", "B", "C")
val result = groupAndAggregate(df, Map("D"-> "SUM", "E"-> "COUNT"), listOfStrings)

我该怎么做?任何人都可以请帮助我.

How can i do this? Can anyone help me please.

推荐答案

您的代码几乎正确 - 有两个问题:

Your code is almost correct - with two issues:

  1. 你的函数的返回类型是DataFrame,但最后一行是aggregated.show(),它返回Unit.去掉对show的调用,返回aggregated本身,或者直接返回agg的结果

  1. The return type of your function is DataFrame, but the last line is aggregated.show(), which returns Unit. Remove the call to show to return aggregated itself, or just return the result of agg immediately

DataFrame.groupBy 期望参数如下:col1: String, cols: String* - 所以你需要传递匹配的参数:第一列,和然后将其余列作为参数列表,您可以按如下方式执行:df.groupBy(cols.head, cols.tail: _*)

DataFrame.groupBy expects arguments as follows: col1: String, cols: String* - so you need to pass matching arguments: the first columns, and then the rest of the columns as a list of arguments, you can do that as follows: df.groupBy(cols.head, cols.tail: _*)

总而言之,您的功能是:

Altogether, your function would be:

def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
  val grouped = df.groupBy(cols.head, cols.tail: _*)
  val aggregated = grouped.agg(aggregateFun)
  aggregated
}

或者,类似的较短版本:

Or, a similar shorter version:

def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame = {
  df.groupBy(cols.head, cols.tail: _*).agg(aggregateFun)
}

如果你确实想在你的函数中调用show:

If you do want to call show within your function:

def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
  val grouped = df.groupBy(cols.head, cols.tail: _*)
  val aggregated = grouped.agg(aggregateFun)
  aggregated.show()
  aggregated
}

这篇关于Scala-Spark 使用参数值动态调用 groupby 和 agg的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆