Spark数据框-窗口功能-滞后插入与插入引线更新输出 [英] Spark Dataframe - Windowing Function - Lag & Lead for Insert & Update output
问题描述
我需要使用Windowing函数Lag and Lead对数据帧执行以下操作.
I need to perform the below operation on dataframes using Windowing function Lag and Lead.
对于每个键,我需要在最终输出中执行以下插入和更新操作
For each Key, I need to perform the below Insert and update in the final output
插入条件:
1.默认情况下,LAYER_NO = 0,需要写在输出中.
2.如果COL1,COL2,COL3的值相对于其珍贵记录有任何变化,则需要将记录写入输出中.
Insert Condition:
1. By Default, LAYER_NO=0 , needs to be written in output.
2. If there is any change in the value of COL1,COL2,COL3, with respective to its precious record,then that records needs to be written in output.
例如:key_1的layer_no = 2,COL3中的值从400更改为600
Example: key_1 with layer_no=2, there is a change of value from 400 to 600 in COL3
更新条件:
1.如果COL1,COL2,COL3的值相对于先前的记录没有变化,但是"DEPART列"中有变化,则需要在输出中更新该值.
Update Condition:
1. If there were NO changes in the value of COL1,COL2,COL3, with respective to its previous record,but there is a change in "DEPART column", this value needs to be updated in the output.
示例:key_1的layer_no = 3,COL1,COL2,COL3没有变化,但是DEPART列中的值变化为"xyz",因此需要在输出中进行更新.
2.在插入layer_no = 0
Example: key_1 with layer_no=3, there were NO changes in COL1,COL2,COL3, But there is value change in DEPART column as "xyz" , so this needs to be updated in the output.
2. Even the LAYER_NO should be updated sequentially, after inserting the record with layer_no=0
val inputDF = values.toDF("KEY","LAYER_NO","COl1","COl2","COl3","DEPART")
inputDF.show()
+-----+--------+----+----+----+------+
| KEY|LAYER_NO|COL1|COL2|COL3|DEPART|
+-----+--------+----+----+----+------+
|key_1| 0| 200| 300| 400| abc|->default write
|key_1| 1| 200| 300| 400| abc|
|key_1| 2| 200| 300| 600| uil|--->change in col3,so write
|key_1| 2| 200| 300| 600| uil|
|key_1| 3| 200| 300| 600| xyz|--->change in col4,so update
|key_2| 0| 500| 700| 900| prq|->default write
|key_2| 1| 888| 555| 900| tep|--->change in col1 & col 2,so write
|key_3| 0| 111| 222| 333| lgh|->default write
|key_3| 1| 084| 222| 333| lgh|--->change in col1,so write
|key_3| 2| 084| 222| 333| rrr|--->change in col4,so update
+-----+--------+----+----+----+------+
预期输出:
outputDF.show()
+-----+--------+----+----+----+------+
| KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1| 0| 200| 300| 400| abc|
|key_1| 1| 200| 300| 600| xyz|
|key_2| 0| 500| 700| 900| prq|
|key_2| 1| 888| 555| 900| tep|
|key_3| 0| 111| 222| 333| lgh|
|key_3| 1| 084| 222| 333| rrr|
+-----+--------+----+----+----+------+
推荐答案
我们需要定义两个Window
来达到您的预期输出.一个用于检查DEPART
列中的更改,第二个用于检查COL1
与COL3
之和的差异.
We need to define two Window
's to arrive at your expected output. One for checking the change in the DEPART
column, the second for checking the difference in the sum of COL1
to COL3
.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val w_col = Window.partitionBy("KEY", "COL1", "COL2", "COL3").orderBy("LAYER_NO")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val w_key = Window.partitionBy("KEY").orderBy("LAYER_NO")
然后,我们只需用正确的值替换DEPART
列中的值,然后将数据过滤到滞后总和不同于当前列总和的行(以及LAYER_NO === 0
中的行).最后,我们将LAYER_NO
替换为等级.
Then we simply replace the values in DEPART
column by the correct values, and filter the data to rows where the lagged sum differs from the current sum of columns (and rows where LAYER_NO === 0
). Lastly, we replace LAYER_NO
by rank.
inputDF.withColumn("DEPART", last("DEPART").over(w_col))
.withColumn("row_sum",($"COL1" + $"COL2" + $"COL3"))
.withColumn("lag_sum", lag($"row_sum",1).over(w_key))
.filter($"LAYER_NO" === 0 || not($"row_sum" === $"lag_sum"))
.withColumn("LAYER_NO", rank.over(w_key)-1)
.drop("row_sum", "lag_sum").show()
+-----+--------+----+----+----+------+
| KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1| 0| 200| 300| 400| abc|
|key_1| 1| 200| 300| 600| xyz|
|key_2| 0| 500| 700| 900| prq|
|key_2| 1| 888| 555| 900| tep|
|key_3| 0| 111| 222| 333| lgh|
|key_3| 1| 084| 222| 333| rrr|
+-----+--------+----+----+----+------+
这篇关于Spark数据框-窗口功能-滞后插入与插入引线更新输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!