Spark中的累积总和 [英] Cumulative sum in Spark

查看:235
本文介绍了Spark中的累积总和的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在Spark中做累积和。这里是注册表(输入):

  + --------------- + ------------------- + ---- + ---- + ---- + 
| PRODUCT_ID | DATE_TIME | ACK | VAL1 |值2 |
+ --------------- + ------------------- + ---- + ---- + ---- +
| 4008607333T.upf | 2017-12-13:02:27:01 | 3-46 | 53 | 52 |
| 4008607333T.upf | 2017-12-13:02:27:03 | 3-47 | 53 | 52 |
| 4008607333T.upf | 2017-12-13:02:27:08 | 3-46 | 53 | 52 |
| 4008607333T.upf | 2017-12-13:02:28:01 | 3-47 | 53 | 52 |
| 4008607333T.upf | 2017-12-13:02:28:07 | 3-46 | 15 | 1 |
+ --------------- + ------------------- + ---- + ---- + ---- +

Hive查询:

  select *,SUM(val1)over(Partition by product_id,ack order by date_time rows between unbounded previous and current row)val1_sum,SUM(val2)over(Partition by product_id,ack order by date_time rows in unbounded preceding and current row)val2_sum from test 

输出:

  + --------------- + -------------- ----- + ---- + ---- + ---- + ------- + -------- + 
| PRODUCT_ID | DATE_TIME | ACK | VAL1 |值2 | val_sum | val2_sum |
+ --------------- + ------------------- + ---- + ---- + ---- + ------- + -------- +
| 4008607333T.upf | 2017-12-13:02:27:01 | 3-46 | 53 | 52 | 53 | 52 |
| 4008607333T.upf | 2017-12-13:02:27:08 | 3-46 | 53 | 52 | 106 | 104 |
| 4008607333T.upf | 2017-12-13:02:28:07 | 3-46 | 15 | 1 | 121 | 105 |
| 4008607333T.upf | 2017-12-13:02:27:03 | 3-47 | 53 | 52 | 53 | 52 |
| 4008607333T.upf | 2017-12-13:02:28:01 | 3-47 | 53 | 52 | 106 | 104 |
+ --------------- + ------------------- + ---- + ---- + ---- + ------- + -------- +



<使用Spark逻辑,我的输出结果如上所示:

  import org.apache.spark.sql.expressions.Window 
val w = Window.partitionBy('product_id,'ack).orderBy('date_time)
import org.apache.spark.sql.functions._

val newDf = inputDF .withColumn(val_sum,sum('val1)over w).withColumn(val2_sum,sum('val2)over w)
newDf.show
pre>

然而,当我在Spark集群上尝试这个逻辑 val_sum 值将是累计和的一半,时间是不同的。我不知道为什么它会发生在火花集群上。是否由于分区?



我如何在火花集群上进行累计和?

解决方案

要使用DataFrame API获取累计和,您应该设置 rowsBetween 窗口方法。在Spark 2.1和更新的版本中:

pre $ val w = Window.partitionBy($product_id,$ack)
.orderBy($date_time)
.rowsBetween(Window.unboundedPreceding,Window.currentRow)

这将告诉Spark使用从分区开始直到当前行的值。使用旧版本的Spark,使用 rowsBetween(Long.MinValue,0)获得相同的效果。



该窗口使用与以前相同的方法。 Ie

  val newDf = inputDF.withColumn(val_sum,sum($val1)。over(w))
.withColumn(val2_sum,sum($val2)。over(w))


I want to do cumulative sum in Spark. Here is the register table (input):

+---------------+-------------------+----+----+----+
|     product_id|          date_time| ack|val1|val2|
+---------------+-------------------+----+----+----+
|4008607333T.upf|2017-12-13:02:27:01|3-46|  53|  52|
|4008607333T.upf|2017-12-13:02:27:03|3-47|  53|  52|
|4008607333T.upf|2017-12-13:02:27:08|3-46|  53|  52|
|4008607333T.upf|2017-12-13:02:28:01|3-47|  53|  52|
|4008607333T.upf|2017-12-13:02:28:07|3-46|  15|   1|
+---------------+-------------------+----+----+----+

Hive query:

select *, SUM(val1) over ( Partition by product_id, ack order by date_time rows between unbounded preceding and current row ) val1_sum, SUM(val2) over ( Partition by product_id, ack order by date_time rows between unbounded preceding and current row ) val2_sum from test

Output:

+---------------+-------------------+----+----+----+-------+--------+
|     product_id|          date_time| ack|val1|val2|val_sum|val2_sum|
+---------------+-------------------+----+----+----+-------+--------+
|4008607333T.upf|2017-12-13:02:27:01|3-46|  53|  52|     53|      52|
|4008607333T.upf|2017-12-13:02:27:08|3-46|  53|  52|    106|     104|
|4008607333T.upf|2017-12-13:02:28:07|3-46|  15|   1|    121|     105|
|4008607333T.upf|2017-12-13:02:27:03|3-47|  53|  52|     53|      52|
|4008607333T.upf|2017-12-13:02:28:01|3-47|  53|  52|    106|     104|
+---------------+-------------------+----+----+----+-------+--------+

Using Spark logic, I am getting same above output:

import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('product_id, 'ack).orderBy('date_time)
import org.apache.spark.sql.functions._

val newDf = inputDF.withColumn("val_sum", sum('val1) over w).withColumn("val2_sum", sum('val2) over w)
newDf.show

However, when I try this logic on spark cluster val_sum value will be half of the cumulative sum and something time it is different. I don't know why it is happening on spark cluster. Is it due to partitions?

How I can do cumulative sum of a column on a spark cluster?

解决方案

To get the cumulative sum using the DataFrame API you should set the rowsBetween window method. In Spark 2.1 and newer:

val w = Window.partitionBy($"product_id", $"ack")
  .orderBy($"date_time")
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)

This will tell Spark to use the values from the beginning of the partition until the current row. Using older versions of Spark, use rowsBetween(Long.MinValue, 0) for the same effect.

To use the window, use the same method as before. I.e.

val newDf = inputDF.withColumn("val_sum", sum($"val1").over(w))
  .withColumn("val2_sum", sum($"val2").over(w))

这篇关于Spark中的累积总和的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆