Spark 数据帧 - 窗口函数 - 滞后 &插入 & 铅更新输出 [英] Spark Dataframe - Windowing Function - Lag & Lead for Insert & Update output

查看:17
本文介绍了Spark 数据帧 - 窗口函数 - 滞后 &插入 & 铅更新输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要使用窗口函数 Lag 和 Lead 对数据帧执行以下操作.

I need to perform the below operation on dataframes using Windowing function Lag and Lead.

对于每个键,我需要在最终输出中执行以下插入和更新

For each Key, I need to perform the below Insert and update in the final output

插入条件:
1. 默认情况下, LAYER_NO=0 ,需要写入输出.
2、如果COL1、COL2、COL3的值有任何变化,相对于其宝贵的记录,则需要将这些记录写入输出.

Insert Condition:
1. By Default, LAYER_NO=0 , needs to be written in output.
2. If there is any change in the value of COL1,COL2,COL3, with respective to its precious record,then that records needs to be written in output.

示例:key_1 与 layer_no=2,COL3 中的值从 400 变为 600

Example: key_1 with layer_no=2, there is a change of value from 400 to 600 in COL3

更新条件:
1. 如果 COL1、COL2、COL3 的值相对于其先前的记录没有变化,但DEPART 列"中有变化,则需要在输出中更新该值.

Update Condition:
1. If there were NO changes in the value of COL1,COL2,COL3, with respective to its previous record,but there is a change in "DEPART column", this value needs to be updated in the output.

示例:key_1 with layer_no=3,COL1,COL2,COL3 没有变化,但是 DEPART 列中的值更改为 "xyz" ,因此需要在输出中更新.
2.即使是LAYER_NO也要依次更新,插入layer_no=0的记录后

Example: key_1 with layer_no=3, there were NO changes in COL1,COL2,COL3, But there is value change in DEPART column as "xyz" , so this needs to be updated in the output.
2. Even the LAYER_NO should be updated sequentially, after inserting the record with layer_no=0

    val inputDF = values.toDF("KEY","LAYER_NO","COl1","COl2","COl3","DEPART")

    inputDF.show()   
    +-----+--------+----+----+----+------+
    |  KEY|LAYER_NO|COL1|COL2|COL3|DEPART|
    +-----+--------+----+----+----+------+
    |key_1|       0| 200| 300| 400|   abc|->default write
    |key_1|       1| 200| 300| 400|   abc|
    |key_1|       2| 200| 300| 600|   uil|--->change in col3,so write
    |key_1|       2| 200| 300| 600|   uil|
    |key_1|       3| 200| 300| 600|   xyz|--->change in col4,so update
    |key_2|       0| 500| 700| 900|   prq|->default write
    |key_2|       1| 888| 555| 900|   tep|--->change in col1 & col 2,so write
    |key_3|       0| 111| 222| 333|   lgh|->default write
    |key_3|       1| 084| 222| 333|   lgh|--->change in col1,so write
    |key_3|       2| 084| 222| 333|   rrr|--->change in col4,so update
    +-----+--------+----+----+----+------+

预期输出:

outputDF.show()
+-----+--------+----+----+----+------+
|  KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1|       0| 200| 300| 400|   abc|
|key_1|       1| 200| 300| 600|   xyz|
|key_2|       0| 500| 700| 900|   prq|
|key_2|       1| 888| 555| 900|   tep|
|key_3|       0| 111| 222| 333|   lgh|
|key_3|       1| 084| 222| 333|   rrr|
+-----+--------+----+----+----+------+

推荐答案

我们需要定义两个 Window 来达到您预期的输出.一个用于检查DEPART列的变化,第二个用于检查COL1COL3之和的差异.

We need to define two Window's to arrive at your expected output. One for checking the change in the DEPART column, the second for checking the difference in the sum of COL1 to COL3.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w_col = Window.partitionBy("KEY", "COL1", "COL2", "COL3").orderBy("LAYER_NO")
                  .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val w_key = Window.partitionBy("KEY").orderBy("LAYER_NO")

然后我们简单地用正确的值替换 DEPART 列中的值,并将数据过滤到滞后总和与当前列总和不同的行(以及 LAYER_NO === 0).最后,我们用排名替换 LAYER_NO.

Then we simply replace the values in DEPART column by the correct values, and filter the data to rows where the lagged sum differs from the current sum of columns (and rows where LAYER_NO === 0). Lastly, we replace LAYER_NO by rank.

inputDF.withColumn("DEPART", last("DEPART").over(w_col))
   .withColumn("row_sum",($"COL1" + $"COL2" + $"COL3"))
   .withColumn("lag_sum", lag($"row_sum",1).over(w_key))
   .filter($"LAYER_NO" === 0 || not($"row_sum" === $"lag_sum"))
   .withColumn("LAYER_NO", rank.over(w_key)-1)
   .drop("row_sum", "lag_sum").show()
+-----+--------+----+----+----+------+
|  KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1|       0| 200| 300| 400|   abc|
|key_1|       1| 200| 300| 600|   xyz|
|key_2|       0| 500| 700| 900|   prq|
|key_2|       1| 888| 555| 900|   tep|
|key_3|       0| 111| 222| 333|   lgh|
|key_3|       1| 084| 222| 333|   rrr|
+-----+--------+----+----+----+------+

这篇关于Spark 数据帧 - 窗口函数 - 滞后 &插入 & 铅更新输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆