比较Spark中当前行和上一行的值 [英] Compare Value of Current and Previous Row in Spark

查看:680
本文介绍了比较Spark中当前行和上一行的值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试比较下面DataFrame中当前行和上一行的记录.我要计算金额"列.

I am trying to compare record of current and previous row in the below DataFrame. I want to calculate the Amount column.

scala> val dataset = sc.parallelize(Seq((1, 123, 50), (2, 456, 30), (3, 456, 70), (4, 789, 80))).toDF("SL_NO","ID","AMOUNT")

scala> dataset.show
+-----+---+------+
|SL_NO| ID|AMOUNT|
+-----+---+------+
|    1|123|    50|
|    2|456|    30|
|    3|456|    70|
|    4|789|    80|
+-----+---+------+

计算逻辑:

  1. 对于第1行,AMOUNT应该从第一行开始为50.
  2. 对于第2行,如果SL_NO-2和1的ID不相同,则需要考虑 SL_NO-2(即30)的金额.否则为SL_NO-1(即-50)的AMOUNT
  3. 对于第3行,如果SL_NO-3和2的ID不相同,则需要考虑 SL_NO的金额-3(即70).否则为SL_NO-2(即-30)的AMOUNT
  1. For the row no 1, AMOUNT should be 50 from first row.
  2. For the row no 2, if ID of SL_NO - 2 and 1 is not same then need to consider AMOUNT of SL_NO - 2 (i.e - 30). Otherwise AMOUNT of SL_NO - 1 (i.e. - 50)
  3. For the row no 3, if ID of SL_NO - 3 and 2 is not same then need to consider AMOUNT of SL_NO - 3 (i.e - 70). Otherwise AMOUNT of SL_NO - 2 (i.e. - 30)

其他行也需要遵循相同的逻辑.

Same logic need to follow for the other rows also.

预期输出:

+-----+---+------+
|SL_NO| ID|AMOUNT|
+-----+---+------+
|    1|123|    50|
|    2|456|    30|
|    3|456|    30|
|    4|789|    80|
+-----+---+------+

请帮助.

推荐答案

您可以使用

You could use lag with when.otherwise, here is a demonstration:

import org.apache.spark.sql.expressions.Window

val w = Window.orderBy($"SL_NO")
dataset.withColumn("AMOUNT", 
    when($"ID" === lag($"ID", 1).over(w), lag($"AMOUNT", 1).over(w)).otherwise($"AMOUNT")
).show

+-----+---+------+
|SL_NO| ID|AMOUNT|
+-----+---+------+
|    1|123|    50|
|    2|456|    30|
|    3|456|    30|
|    4|789|    80|
+-----+---+------+

注意:由于该示例未使用任何分区,因此它可能会出现性能问题,在您的实际数据中,如果您的问题可以由某些变量进行分区,则可能会有所帮助,例如Window.orderBy($"SL_NO").partitionBy($"ID")取决于您的实际问题以及ID是否一起排序.

Note: since this example doesn't use any partition, it could have performance problem, in your real data, it would be helpful if your problem can be partitioned by some variables, may be Window.orderBy($"SL_NO").partitionBy($"ID") depending on your actual problem and whether IDs are sorted together.

这篇关于比较Spark中当前行和上一行的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆