Spark 数据帧 - 窗口函数 - 滞后 &插入 & 铅更新输出 [英] Spark Dataframe - Windowing Function - Lag & Lead for Insert & Update output
问题描述
我需要使用窗口函数 Lag 和 Lead 对数据帧执行以下操作.
I need to perform the below operation on dataframes using Windowing function Lag and Lead.
对于每个键,我需要在最终输出中执行以下插入和更新
For each Key, I need to perform the below Insert and update in the final output
插入条件:
1. 默认情况下, LAYER_NO=0 ,需要写入输出.
2、如果COL1、COL2、COL3的值有任何变化,相对于其宝贵的记录,则需要将这些记录写入输出.
Insert Condition:
1. By Default, LAYER_NO=0 , needs to be written in output.
2. If there is any change in the value of COL1,COL2,COL3, with respective to its precious record,then that records needs to be written in output.
示例:key_1 与 layer_no=2,COL3 中的值从 400 变为 600
Example: key_1 with layer_no=2, there is a change of value from 400 to 600 in COL3
更新条件:
1. 如果 COL1、COL2、COL3 的值相对于其先前的记录没有变化,但DEPART 列"中有变化,则需要在输出中更新该值.
Update Condition:
1. If there were NO changes in the value of COL1,COL2,COL3, with respective to its previous record,but there is a change in "DEPART column", this value needs to be updated in the output.
示例:key_1 with layer_no=3,COL1,COL2,COL3 没有变化,但是 DEPART 列中的值更改为 "xyz" ,因此需要在输出中更新.
2.即使是LAYER_NO也要依次更新,插入layer_no=0的记录后
Example: key_1 with layer_no=3, there were NO changes in COL1,COL2,COL3, But there is value change in DEPART column as "xyz" , so this needs to be updated in the output.
2. Even the LAYER_NO should be updated sequentially, after inserting the record with layer_no=0
val inputDF = values.toDF("KEY","LAYER_NO","COl1","COl2","COl3","DEPART")
inputDF.show()
+-----+--------+----+----+----+------+
| KEY|LAYER_NO|COL1|COL2|COL3|DEPART|
+-----+--------+----+----+----+------+
|key_1| 0| 200| 300| 400| abc|->default write
|key_1| 1| 200| 300| 400| abc|
|key_1| 2| 200| 300| 600| uil|--->change in col3,so write
|key_1| 2| 200| 300| 600| uil|
|key_1| 3| 200| 300| 600| xyz|--->change in col4,so update
|key_2| 0| 500| 700| 900| prq|->default write
|key_2| 1| 888| 555| 900| tep|--->change in col1 & col 2,so write
|key_3| 0| 111| 222| 333| lgh|->default write
|key_3| 1| 084| 222| 333| lgh|--->change in col1,so write
|key_3| 2| 084| 222| 333| rrr|--->change in col4,so update
+-----+--------+----+----+----+------+
预期输出:
outputDF.show()
+-----+--------+----+----+----+------+
| KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1| 0| 200| 300| 400| abc|
|key_1| 1| 200| 300| 600| xyz|
|key_2| 0| 500| 700| 900| prq|
|key_2| 1| 888| 555| 900| tep|
|key_3| 0| 111| 222| 333| lgh|
|key_3| 1| 084| 222| 333| rrr|
+-----+--------+----+----+----+------+
推荐答案
我们需要定义两个 Window
来达到您预期的输出.一个用于检查DEPART
列的变化,第二个用于检查COL1
到COL3
之和的差异.
We need to define two Window
's to arrive at your expected output. One for checking the change in the DEPART
column, the second for checking the difference in the sum of COL1
to COL3
.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val w_col = Window.partitionBy("KEY", "COL1", "COL2", "COL3").orderBy("LAYER_NO")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val w_key = Window.partitionBy("KEY").orderBy("LAYER_NO")
然后我们简单地用正确的值替换 DEPART
列中的值,并将数据过滤到滞后总和与当前列总和不同的行(以及 LAYER_NO === 0
).最后,我们用排名替换 LAYER_NO
.
Then we simply replace the values in DEPART
column by the correct values, and filter the data to rows where the lagged sum differs from the current sum of columns (and rows where LAYER_NO === 0
). Lastly, we replace LAYER_NO
by rank.
inputDF.withColumn("DEPART", last("DEPART").over(w_col))
.withColumn("row_sum",($"COL1" + $"COL2" + $"COL3"))
.withColumn("lag_sum", lag($"row_sum",1).over(w_key))
.filter($"LAYER_NO" === 0 || not($"row_sum" === $"lag_sum"))
.withColumn("LAYER_NO", rank.over(w_key)-1)
.drop("row_sum", "lag_sum").show()
+-----+--------+----+----+----+------+
| KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1| 0| 200| 300| 400| abc|
|key_1| 1| 200| 300| 600| xyz|
|key_2| 0| 500| 700| 900| prq|
|key_2| 1| 888| 555| 900| tep|
|key_3| 0| 111| 222| 333| lgh|
|key_3| 1| 084| 222| 333| rrr|
+-----+--------+----+----+----+------+
这篇关于Spark 数据帧 - 窗口函数 - 滞后 &插入 & 铅更新输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!