Spark中的累积总和 [英] Cumulative sum in Spark
问题描述
我想在Spark中做累积和。这里是注册表(输入):
+ --------------- + ------------------- + ---- + ---- + ---- +
| PRODUCT_ID | DATE_TIME | ACK | VAL1 |值2 |
+ --------------- + ------------------- + ---- + ---- + ---- +
| 4008607333T.upf | 2017-12-13:02:27:01 | 3-46 | 53 | 52 |
| 4008607333T.upf | 2017-12-13:02:27:03 | 3-47 | 53 | 52 |
| 4008607333T.upf | 2017-12-13:02:27:08 | 3-46 | 53 | 52 |
| 4008607333T.upf | 2017-12-13:02:28:01 | 3-47 | 53 | 52 |
| 4008607333T.upf | 2017-12-13:02:28:07 | 3-46 | 15 | 1 |
+ --------------- + ------------------- + ---- + ---- + ---- +
Hive查询:
select *,SUM(val1)over(Partition by product_id,ack order by date_time rows between unbounded previous and current row)val1_sum,SUM(val2)over(Partition by product_id,ack order by date_time rows in unbounded preceding and current row)val2_sum from test
输出:
+ --------------- + -------------- ----- + ---- + ---- + ---- + ------- + -------- +
| PRODUCT_ID | DATE_TIME | ACK | VAL1 |值2 | val_sum | val2_sum |
+ --------------- + ------------------- + ---- + ---- + ---- + ------- + -------- +
| 4008607333T.upf | 2017-12-13:02:27:01 | 3-46 | 53 | 52 | 53 | 52 |
| 4008607333T.upf | 2017-12-13:02:27:08 | 3-46 | 53 | 52 | 106 | 104 |
| 4008607333T.upf | 2017-12-13:02:28:07 | 3-46 | 15 | 1 | 121 | 105 |
| 4008607333T.upf | 2017-12-13:02:27:03 | 3-47 | 53 | 52 | 53 | 52 |
| 4008607333T.upf | 2017-12-13:02:28:01 | 3-47 | 53 | 52 | 106 | 104 |
+ --------------- + ------------------- + ---- + ---- + ---- + ------- + -------- +
<使用Spark逻辑,我的输出结果如上所示:
import org.apache.spark.sql.expressions.Window
pre>
val w = Window.partitionBy('product_id,'ack).orderBy('date_time)
import org.apache.spark.sql.functions._
val newDf = inputDF .withColumn(val_sum,sum('val1)over w).withColumn(val2_sum,sum('val2)over w)
newDf.show
然而,当我在Spark集群上尝试这个逻辑
val_sum
值将是累计和的一半,时间是不同的。我不知道为什么它会发生在火花集群上。是否由于分区?
我如何在火花集群上进行累计和?
解决方案要使用DataFrame API获取累计和,您应该设置
rowsBetween
窗口方法。在Spark 2.1和更新的版本中:
pre $val w = Window.partitionBy($product_id,$ack)
.orderBy($date_time)
.rowsBetween(Window.unboundedPreceding,Window.currentRow)
这将告诉Spark使用从分区开始直到当前行的值。使用旧版本的Spark,使用 rowsBetween(Long.MinValue,0)
获得相同的效果。
该窗口使用与以前相同的方法。 Ie
val newDf = inputDF.withColumn(val_sum,sum($val1)。over(w))
.withColumn(val2_sum,sum($val2)。over(w))
I want to do cumulative sum in Spark. Here is the register table (input):
+---------------+-------------------+----+----+----+
| product_id| date_time| ack|val1|val2|
+---------------+-------------------+----+----+----+
|4008607333T.upf|2017-12-13:02:27:01|3-46| 53| 52|
|4008607333T.upf|2017-12-13:02:27:03|3-47| 53| 52|
|4008607333T.upf|2017-12-13:02:27:08|3-46| 53| 52|
|4008607333T.upf|2017-12-13:02:28:01|3-47| 53| 52|
|4008607333T.upf|2017-12-13:02:28:07|3-46| 15| 1|
+---------------+-------------------+----+----+----+
Hive query:
select *, SUM(val1) over ( Partition by product_id, ack order by date_time rows between unbounded preceding and current row ) val1_sum, SUM(val2) over ( Partition by product_id, ack order by date_time rows between unbounded preceding and current row ) val2_sum from test
Output:
+---------------+-------------------+----+----+----+-------+--------+
| product_id| date_time| ack|val1|val2|val_sum|val2_sum|
+---------------+-------------------+----+----+----+-------+--------+
|4008607333T.upf|2017-12-13:02:27:01|3-46| 53| 52| 53| 52|
|4008607333T.upf|2017-12-13:02:27:08|3-46| 53| 52| 106| 104|
|4008607333T.upf|2017-12-13:02:28:07|3-46| 15| 1| 121| 105|
|4008607333T.upf|2017-12-13:02:27:03|3-47| 53| 52| 53| 52|
|4008607333T.upf|2017-12-13:02:28:01|3-47| 53| 52| 106| 104|
+---------------+-------------------+----+----+----+-------+--------+
Using Spark logic, I am getting same above output:
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('product_id, 'ack).orderBy('date_time)
import org.apache.spark.sql.functions._
val newDf = inputDF.withColumn("val_sum", sum('val1) over w).withColumn("val2_sum", sum('val2) over w)
newDf.show
However, when I try this logic on spark cluster val_sum
value will be half of the cumulative sum and something time it is different. I don't know why it is happening on spark cluster. Is it due to partitions?
How I can do cumulative sum of a column on a spark cluster?
To get the cumulative sum using the DataFrame API you should set the rowsBetween
window method. In Spark 2.1 and newer:
val w = Window.partitionBy($"product_id", $"ack")
.orderBy($"date_time")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
This will tell Spark to use the values from the beginning of the partition until the current row. Using older versions of Spark, use rowsBetween(Long.MinValue, 0)
for the same effect.
To use the window, use the same method as before. I.e.
val newDf = inputDF.withColumn("val_sum", sum($"val1").over(w))
.withColumn("val2_sum", sum($"val2").over(w))
这篇关于Spark中的累积总和的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!