SparkSQL:使用两列的条件总和 [英] SparkSQL: conditional sum using two columns
问题描述
希望您能对此有所帮助。
我有一个DF,如下所示:
I hope you can help me with this. I have a DF as follows:
val df = sc.parallelize(Seq(
(1, "a", "2014-12-01", "2015-01-01", 100),
(2, "a", "2014-12-01", "2015-01-02", 150),
(3, "a", "2014-12-01", "2015-01-03", 120),
(4, "b", "2015-12-15", "2015-01-01", 100)
)).toDF("id", "prodId", "dateIns", "dateTrans", "value")
.withColumn("dateIns", to_date($"dateIns")
.withColumn("dateTrans", to_date($"dateTrans"))
我想做一个groupBy prodId并汇总值,以求和 dateIns列与 dateTrans列之间的差值定义的日期范围。一种定义条件总和的方法,该条件总和将上述各列之间的预定义最大差之内的所有值相加。即,所有从dateIns开始的10、20、30天之间发生的值('dateTrans'-'dateIns'< = 10,20 ,30)。
I would love to do a groupBy prodId and aggregate 'value' summing it for ranges of dates defined by the difference between the column 'dateIns' and 'dateTrans'. In particular, I would like to have a way to define a conditional sum that sums all values within a predefined max difference between the above mentioned columns. I.e. all value that happened between 10, 20, 30 days from dateIns ('dateTrans' - 'dateIns' <=10, 20, 30).
spark中是否有任何预定义的聚合函数允许d是否有条件的总和?您是否建议开发aggr。 UDF(如果有,则有何建议)?
我正在使用pySpqrk,但也非常高兴获得Scala解决方案。非常感谢!
Is there any predefined aggregated function in spark that allows doing conditional sums? Do you recommend develop a aggr. UDF (if so, any suggestions)? I'm using pySpqrk, but very happy to get Scala solutions as well. Thanks a lot!
推荐答案
让我们变得更加有趣,所以窗口中会发生一些事件:
Lets make your a little bit more interesting so there are some events in the window:
val df = sc.parallelize(Seq(
(1, "a", "2014-12-30", "2015-01-01", 100),
(2, "a", "2014-12-21", "2015-01-02", 150),
(3, "a", "2014-12-10", "2015-01-03", 120),
(4, "b", "2014-12-05", "2015-01-01", 100)
)).toDF("id", "prodId", "dateIns", "dateTrans", "value")
.withColumn("dateIns", to_date($"dateIns"))
.withColumn("dateTrans", to_date($"dateTrans"))
您所需的或多或少是这样的:
What you need is more or less something like this:
import org.apache.spark.sql.functions.{col, datediff, lit, sum}
// Find difference in tens of days
val diff = (datediff(col("dateTrans"), col("dateIns")) / 10)
.cast("integer") * 10
val dfWithDiff = df.withColumn("diff", diff)
val aggregated = dfWithDiff
.where((col("diff") < 30) && (col("diff") >= 0))
.groupBy(col("prodId"), col("diff"))
.agg(sum(col("value")))
和结果
aggregated.show
// +------+----+----------+
// |prodId|diff|sum(value)|
// +------+----+----------+
// | a| 20| 120|
// | b| 20| 100|
// | a| 0| 100|
// | a| 10| 150|
// +------+----+----------+
其中, diff
是范围(0-> [0,10),10-> [10,20)的下限, ...)。如果您删除 val
并调整导入,这也将在PySpark中起作用。
where diff
is a lower bound for the range (0 -> [0, 10), 10 -> [10, 20), ...). This will work in PySpark as well if you remove val
and adjust imports.
编辑 >(每列总计):
val exprs = Seq(0, 10, 20).map(x => sum(
when(col("diff") === lit(x), col("value"))
.otherwise(lit(0)))
.alias(x.toString))
dfWithDiff.groupBy(col("prodId")).agg(exprs.head, exprs.tail: _*).show
// +------+---+---+---+
// |prodId| 0| 10| 20|
// +------+---+---+---+
// | a|100|150|120|
// | b| 0| 0|100|
// +------+---+---+---+
与Python等效:
from pyspark.sql.functions import *
def make_col(x):
cnd = when(col("diff") == lit(x), col("value")).otherwise(lit(0))
return sum(cnd).alias(str(x))
exprs = [make_col(x) for x in range(0, 30, 10)]
dfWithDiff.groupBy(col("prodId")).agg(*exprs).show()
## +------+---+---+---+
## |prodId| 0| 10| 20|
## +------+---+---+---+
## | a|100|150|120|
## | b| 0| 0|100|
## +------+---+---+---+
这篇关于SparkSQL:使用两列的条件总和的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!