火花数据帧的同一列上的多个聚合操作 [英] Multiple Aggregate operations on the same column of a spark dataframe

查看:31
本文介绍了火花数据帧的同一列上的多个聚合操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有三个包含以下信息的字符串类型数组:

I have three Arrays of string type containing following information:

  • groupBy 数组:包含我想要对数据进行分组的列的名称.
  • 聚合数组:包含我要聚合的列的名称.
  • operations 数组:包含我想要执行的聚合操作

我正在尝试使用 spark 数据框来实现这一点.Spark 数据帧提供了一个 agg(),您可以在其中传递 Map [String,String](列名和各自的聚合操作)作为输入,但是我想对数据的同一列执行不同的聚合操作.有关如何实现这一目标的任何建议?

I am trying to use spark data frames to achieve this. Spark data frames provide an agg() where you can pass a Map [String,String] (of column name and respective aggregate operation ) as input, however I want to perform different aggregation operations on the same column of the data. Any suggestions on how to achieve this?

推荐答案

Scala:

例如,您可以使用定义的 mapping 从名称到函数映射函数列表:

You can for example map over a list of functions with a defined mapping from name to function:

import org.apache.spark.sql.functions.{col, min, max, mean}
import org.apache.spark.sql.Column

val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v")
val mapping: Map[String, Column => Column] = Map(
  "min" -> min, "max" -> max, "mean" -> avg)

val groupBy = Seq("k")
val aggregate = Seq("v")
val operations = Seq("min", "max", "mean")
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c))))

df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show
// +---+------+------+------+
// |  k|min(v)|max(v)|avg(v)|
// +---+------+------+------+
// |  1|   3.0|   3.0|   3.0|
// |  2|  -5.0|  -5.0|  -5.0|
// +---+------+------+------+

df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show

不幸的是,内部使用的解析器 SQLContext 并未公开,但您始终可以尝试构建纯 SQL 查询:

Unfortunately parser which is used internally SQLContext is not exposed publicly but you can always try to build plain SQL queries:

df.registerTempTable("df")
val groupExprs = groupBy.mkString(",")
val aggExprs = aggregate.flatMap(c => operations.map(
  f => s"$f($c) AS ${c}_${f}")
).mkString(",")

sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs")

Python:

from pyspark.sql.functions import mean, sum, max, col

df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"])
groupBy = ["k"]
aggregate = ["v"] 
funs = [mean, sum, max]

exprs = [f(col(c)) for f in funs for c in aggregate]

# or equivalent df.groupby(groupBy).agg(*exprs)
df.groupby(*groupBy).agg(*exprs)

另见:

这篇关于火花数据帧的同一列上的多个聚合操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆