聚合后如何包括未聚合的列? [英] How to include non-aggregated columns after aggregation?

查看:54
本文介绍了聚合后如何包括未聚合的列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用spark-sql-2.4.1v.在这里,我有下面的情况

I am using spark-sql-2.4.1v. Here I have scenario like below

val df = Seq(
  (2010,"2018-11-24",71285,"USA","0.9192019",  "0.1992019",  "0.9955999"),
  (2010,"2017-08-24",71286,"USA","0.9292018",  "0.2992019",  "0.99662018"),
  (2010,"2019-02-24",71287,"USA","0.9392017",  "0.3992019",  "0.99772000")).toDF("seq_id","load_date","company_id","country_code","item1_value","item2_value","item3_value")
.withColumn("item1_value", $"item1_value".cast(DoubleType))
.withColumn("item2_value", $"item2_value".cast(DoubleType))
.withColumn("item3_value", $"item3_value".cast(DoubleType))
.withColumn("fiscal_year", year(col("load_date")).cast(IntegerType))
.withColumn("fiscal_quarter", quarter(col("load_date")).cast(IntegerType))

df.show()


val aggregateColumns = Seq("item1_value","item2_value","item3_value")
var aggDFs = aggregateColumns.map( c => {
    df.groupBy("country_code").agg(lit(c).as("col_name"),sum(c).as("sum_of_column"))
})


var combinedDF = aggDFs.reduce(_ union _)
combinedDF.show

我得到的输出数据

Output data i am getting like

|country_code|   col_name|     sum_of_column|


|         USA|item1_value|         2.7876054|
|         USA|item2_value|         0.8976057|
|         USA|item3_value|2.9899400800000002|

我需要在输出中添加其他列,即"seq_id","load_date"和"company_id"数据帧聚合操作后如何获取它们?

推荐答案

您可以使用Window函数显示未聚合的列,也可以说在每一行显示总和.

You can use Window functions to show non-aggregated columns or can say showing sum in each row.

如果有帮助,请参见下面的代码片段:

Please see below code snippet if it helps:

import org.apache.spark.sql.expressions.Window

val df = Seq(
  (2010,"2018-11-24",71285,"USA","0.9192019",  "0.1992019",  "0.9955999"),
  (2010,"2017-08-24",71286,"USA","0.9292018",  "0.2992019",  "0.99662018"),
  (2010,"2019-02-24",71287,"USA","0.9392017",  "0.3992019",  "0.99772000")).
  toDF("seq_id","load_date","company_id","country_code","item1_value","item2_value","item3_value").
  withColumn("item1_value", $"item1_value".cast(DoubleType)).
  withColumn("item2_value", $"item2_value".cast(DoubleType)).
  withColumn("item3_value", $"item3_value".cast(DoubleType)).
  withColumn("fiscal_year", year(col("load_date")).cast(IntegerType)).
  withColumn("fiscal_quarter", quarter(col("load_date")).cast(IntegerType))


val byCountry = Window.partitionBy(col("country_code"))

val aggregateColumns = Seq("item1_value","item2_value","item3_value")
var aggDFs = aggregateColumns.map( c => {
    df.withColumn("col_name",lit(c)).withColumn("sum_country", sum(c) over byCountry)
})

var combinedDF = aggDFs.reduce(_ union _)

combinedDF.
select("seq_id","load_date","company_id","country_code","col_name","sum_country").
distinct.show(100,false)

输出如下:

+------+----------+----------+------------+-----------+------------------+
|seq_id|load_date |company_id|country_code|col_name   |sum_country       |
+------+----------+----------+------------+-----------+------------------+
|2010  |2019-02-24|71287     |USA         |item1_value|2.7876054         |
|2010  |2018-11-24|71285     |USA         |item1_value|2.7876054         |
|2010  |2017-08-24|71286     |USA         |item1_value|2.7876054         |
|2010  |2018-11-24|71285     |USA         |item2_value|0.8976057000000001|
|2010  |2019-02-24|71287     |USA         |item2_value|0.8976057000000001|
|2010  |2017-08-24|71286     |USA         |item2_value|0.8976057000000001|
|2010  |2019-02-24|71287     |USA         |item3_value|2.9899400800000002|
|2010  |2018-11-24|71285     |USA         |item3_value|2.9899400800000002|
|2010  |2017-08-24|71286     |USA         |item3_value|2.9899400800000002|
+------+----------+----------+------------+-----------+------------------+

这篇关于聚合后如何包括未聚合的列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆