Spark数据框-窗口功能-滞后插入与插入引线更新输出 [英] Spark Dataframe - Windowing Function - Lag & Lead for Insert & Update output

查看:64
本文介绍了Spark数据框-窗口功能-滞后插入与插入引线更新输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要使用Windowing函数Lag and Lead对数据帧执行以下操作.

I need to perform the below operation on dataframes using Windowing function Lag and Lead.

对于每个键,我需要在最终输出中执行以下插入和更新操作

For each Key, I need to perform the below Insert and update in the final output

插入条件:
1.默认情况下,LAYER_NO = 0,需要写在输出中.
2.如果COL1,COL2,COL3的值相对于其珍贵记录有任何变化,则需要将记录写入输出中.

Insert Condition:
1. By Default, LAYER_NO=0 , needs to be written in output.
2. If there is any change in the value of COL1,COL2,COL3, with respective to its precious record,then that records needs to be written in output.

例如:key_1的layer_no = 2,COL3中的值从400更改为600

Example: key_1 with layer_no=2, there is a change of value from 400 to 600 in COL3

更新条件:
1.如果COL1,COL2,COL3的值相对于先前的记录没有变化,但是"DEPART列"中有变化,则需要在输出中更新该值.

Update Condition:
1. If there were NO changes in the value of COL1,COL2,COL3, with respective to its previous record,but there is a change in "DEPART column", this value needs to be updated in the output.

示例:key_1的layer_no = 3,COL1,COL2,COL3没有变化,但是DEPART列中的值变化为"xyz",因此需要在输出中进行更新.
2.在插入layer_no = 0

Example: key_1 with layer_no=3, there were NO changes in COL1,COL2,COL3, But there is value change in DEPART column as "xyz" , so this needs to be updated in the output.
2. Even the LAYER_NO should be updated sequentially, after inserting the record with layer_no=0

    val inputDF = values.toDF("KEY","LAYER_NO","COl1","COl2","COl3","DEPART")

    inputDF.show()   
    +-----+--------+----+----+----+------+
    |  KEY|LAYER_NO|COL1|COL2|COL3|DEPART|
    +-----+--------+----+----+----+------+
    |key_1|       0| 200| 300| 400|   abc|->default write
    |key_1|       1| 200| 300| 400|   abc|
    |key_1|       2| 200| 300| 600|   uil|--->change in col3,so write
    |key_1|       2| 200| 300| 600|   uil|
    |key_1|       3| 200| 300| 600|   xyz|--->change in col4,so update
    |key_2|       0| 500| 700| 900|   prq|->default write
    |key_2|       1| 888| 555| 900|   tep|--->change in col1 & col 2,so write
    |key_3|       0| 111| 222| 333|   lgh|->default write
    |key_3|       1| 084| 222| 333|   lgh|--->change in col1,so write
    |key_3|       2| 084| 222| 333|   rrr|--->change in col4,so update
    +-----+--------+----+----+----+------+

预期输出:

outputDF.show()
+-----+--------+----+----+----+------+
|  KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1|       0| 200| 300| 400|   abc|
|key_1|       1| 200| 300| 600|   xyz|
|key_2|       0| 500| 700| 900|   prq|
|key_2|       1| 888| 555| 900|   tep|
|key_3|       0| 111| 222| 333|   lgh|
|key_3|       1| 084| 222| 333|   rrr|
+-----+--------+----+----+----+------+

推荐答案

我们需要定义两个Window来达到您的预期输出.一个用于检查DEPART列中的更改,第二个用于检查COL1COL3之和的差异.

We need to define two Window's to arrive at your expected output. One for checking the change in the DEPART column, the second for checking the difference in the sum of COL1 to COL3.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w_col = Window.partitionBy("KEY", "COL1", "COL2", "COL3").orderBy("LAYER_NO")
                  .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val w_key = Window.partitionBy("KEY").orderBy("LAYER_NO")

然后,我们只需用正确的值替换DEPART列中的值,然后将数据过滤到滞后总和不同于当前列总和的行(以及LAYER_NO === 0中的行).最后,我们将LAYER_NO替换为等级.

Then we simply replace the values in DEPART column by the correct values, and filter the data to rows where the lagged sum differs from the current sum of columns (and rows where LAYER_NO === 0). Lastly, we replace LAYER_NO by rank.

inputDF.withColumn("DEPART", last("DEPART").over(w_col))
   .withColumn("row_sum",($"COL1" + $"COL2" + $"COL3"))
   .withColumn("lag_sum", lag($"row_sum",1).over(w_key))
   .filter($"LAYER_NO" === 0 || not($"row_sum" === $"lag_sum"))
   .withColumn("LAYER_NO", rank.over(w_key)-1)
   .drop("row_sum", "lag_sum").show()
+-----+--------+----+----+----+------+
|  KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1|       0| 200| 300| 400|   abc|
|key_1|       1| 200| 300| 600|   xyz|
|key_2|       0| 500| 700| 900|   prq|
|key_2|       1| 888| 555| 900|   tep|
|key_3|       0| 111| 222| 333|   lgh|
|key_3|       1| 084| 222| 333|   rrr|
+-----+--------+----+----+----+------+

这篇关于Spark数据框-窗口功能-滞后插入与插入引线更新输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆