SparkSQL:使用两列的条件总和 [英] SparkSQL: conditional sum using two columns

查看:58
本文介绍了SparkSQL:使用两列的条件总和的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

希望你能帮我解决这个问题.我有一个 DF 如下:

I hope you can help me with this. I have a DF as follows:

val df = sc.parallelize(Seq(
  (1, "a", "2014-12-01", "2015-01-01", 100), 
  (2, "a", "2014-12-01", "2015-01-02", 150),
  (3, "a", "2014-12-01", "2015-01-03", 120), 
  (4, "b", "2015-12-15", "2015-01-01", 100)
)).toDF("id", "prodId", "dateIns", "dateTrans", "value")
.withColumn("dateIns", to_date($"dateIns")
.withColumn("dateTrans", to_date($"dateTrans"))

我想做一个 groupBy prodId 并聚合value",将它与由dateIns"和dateTrans"列之间的差异定义的日期范围相加.特别是,我想有一种方法来定义一个条件总和,该总和将上述列之间的预定义最大差异内的所有值相加.IE.从 dateIns ('dateTrans' - 'dateIns' <=10, 20, 30) 起 10、20、30 天之间发生的所有值.

I would love to do a groupBy prodId and aggregate 'value' summing it for ranges of dates defined by the difference between the column 'dateIns' and 'dateTrans'. In particular, I would like to have a way to define a conditional sum that sums all values within a predefined max difference between the above mentioned columns. I.e. all value that happened between 10, 20, 30 days from dateIns ('dateTrans' - 'dateIns' <=10, 20, 30).

spark 中是否有任何允许进行条件求和的预定义聚合函数?您是否建议开发 aggr.UDF(如果是这样,有什么建议)?我正在使用 pySpqrk,但也很高兴获得 Scala 解决方案.非常感谢!

Is there any predefined aggregated function in spark that allows doing conditional sums? Do you recommend develop a aggr. UDF (if so, any suggestions)? I'm using pySpqrk, but very happy to get Scala solutions as well. Thanks a lot!

推荐答案

让你变得更有趣,所以窗口中会有一些事件:

Lets make your a little bit more interesting so there are some events in the window:

val df = sc.parallelize(Seq(
  (1, "a", "2014-12-30", "2015-01-01", 100), 
  (2, "a", "2014-12-21", "2015-01-02", 150),
  (3, "a", "2014-12-10", "2015-01-03", 120), 
  (4, "b", "2014-12-05", "2015-01-01", 100)
)).toDF("id", "prodId", "dateIns", "dateTrans", "value")
.withColumn("dateIns", to_date($"dateIns"))
.withColumn("dateTrans", to_date($"dateTrans"))

你需要的东西或多或少是这样的:

What you need is more or less something like this:

import org.apache.spark.sql.functions.{col, datediff, lit, sum}

// Find difference in tens of days 
val diff = (datediff(col("dateTrans"), col("dateIns")) / 10)
  .cast("integer") * 10

val dfWithDiff = df.withColumn("diff", diff)

val aggregated = dfWithDiff 
  .where((col("diff") < 30) && (col("diff") >= 0))
  .groupBy(col("prodId"), col("diff"))
  .agg(sum(col("value")))

结果

aggregated.show
// +------+----+----------+
// |prodId|diff|sum(value)|
// +------+----+----------+
// |     a|  20|       120|
// |     b|  20|       100|
// |     a|   0|       100|
// |     a|  10|       150|
// +------+----+----------+

其中 diff 是范围 (0 -> [0, 10), 10 -> [10, 20), ...) 的下限.如果您删除 val 并调整导入,这也适用于 PySpark.

where diff is a lower bound for the range (0 -> [0, 10), 10 -> [10, 20), ...). This will work in PySpark as well if you remove val and adjust imports.

编辑(每列汇总):

val exprs = Seq(0, 10,  20).map(x => sum(
  when(col("diff") === lit(x), col("value"))
    .otherwise(lit(0)))
    .alias(x.toString))

dfWithDiff.groupBy(col("prodId")).agg(exprs.head, exprs.tail: _*).show

// +------+---+---+---+
// |prodId|  0| 10| 20|
// +------+---+---+---+
// |     a|100|150|120|
// |     b|  0|  0|100|
// +------+---+---+---+

与 Python 等效:

with Python equivalent:

from pyspark.sql.functions import *

def make_col(x):
   cnd = when(col("diff") == lit(x), col("value")).otherwise(lit(0))
   return sum(cnd).alias(str(x))

exprs = [make_col(x) for x in range(0, 30, 10)]
dfWithDiff.groupBy(col("prodId")).agg(*exprs).show()   

## +------+---+---+---+
## |prodId|  0| 10| 20|
## +------+---+---+---+
## |     a|100|150|120|
## |     b|  0|  0|100|
## +------+---+---+---+

这篇关于SparkSQL:使用两列的条件总和的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆