在 Spark 中实现 SCD 类型 2 [英] Implement SCD Type 2 in Spark

查看:23
本文介绍了在 Spark 中实现 SCD 类型 2的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

尝试在 Spark 2.4.4 中实现 SCD Type 2 逻辑.我有两个数据帧;一个包含现有数据",另一个包含新传入数据".

Trying to implement SCD Type 2 logic in Spark 2.4.4. I've two Data Frames; one containing 'Existing Data' and the other containing 'New Incoming Data'.

输入和预期输出如下.需要发生的是:

Input and expected output are given below. What needs to happen is:

  1. 所有传入的行都应该附加到现有数据中.

  1. All incoming rows should get appended to the existing data.

只有以下 3 行以前处于活动状态"才应变为非活动状态,并按如下方式填充适当的结束日期":

Only following 3 rows which were previously 'active' should become inactive with appropriate 'endDate' populated as follows:

pk=1,数量=20=>行应该变为非活动"&'endDate' 是下一行(Lead)的 'startDate'

pk=1, amount = 20 => Row should become 'inactive' & 'endDate' is the 'startDate' of following row (Lead)

pk=2,数量=100=>行应该变为非活动"&'endDate' 是下一行(Lead)的 'startDate'

pk=2, amount = 100 => Row should become 'inactive' & 'endDate' is the 'startDate' of following row (Lead)

pk=3,数量=750=>行应该变为非活动"&'endDate' 是下一行(Lead)的 'startDate'

pk=3, amount = 750 => Row should become 'inactive' & 'endDate' is the 'startDate' of following row (Lead)

如何在 Spark 中执行此操作?

How do I do this in Spark?

现有数据:

+---+------+-------------------+-------------------+------+
| pk|amount|          startDate|            endDate|active|
+---+------+-------------------+-------------------+------+
|  1|    10|2019-01-01 12:00:00|2019-01-20 05:00:00|     0|
|  1|    20|2019-01-20 05:00:00|               null|     1|
|  2|   100|2019-01-01 00:00:00|               null|     1|
|  3|    75|2019-01-01 06:00:00|2019-01-26 08:00:00|     0|
|  3|   750|2019-01-26 08:00:00|               null|     1|
| 10|    40|2019-01-01 00:00:00|               null|     1|
+---+------+-------------------+-------------------+------+

新传入数据:

+---+------+-------------------+-------------------+------+
| pk|amount|          startDate|            endDate|active|
+---+------+-------------------+-------------------+------+
|  1|    50|2019-02-01 07:00:00|2019-02-02 08:00:00|     0|
|  1|    75|2019-02-02 08:00:00|               null|     1|
|  2|   200|2019-02-01 05:00:00|2019-02-01 13:00:00|     0|
|  2|    60|2019-02-01 13:00:00|2019-02-01 19:00:00|     0|
|  2|   500|2019-02-01 19:00:00|               null|     1|
|  3|   175|2019-02-01 00:00:00|               null|     1|
|  4|    50|2019-02-02 12:00:00|2019-02-02 14:00:00|     0|
|  4|   300|2019-02-02 14:00:00|               null|     1|
|  5|   500|2019-02-02 00:00:00|               null|     1|
+---+------+-------------------+-------------------+------+

预期输出:

+---+------+-------------------+-------------------+------+
| pk|amount|          startDate|            endDate|active|
+---+------+-------------------+-------------------+------+
|  1|    10|2019-01-01 12:00:00|2019-01-20 05:00:00|     0|
|  1|    20|2019-01-20 05:00:00|2019-02-01 07:00:00|     0|
|  1|    50|2019-02-01 07:00:00|2019-02-02 08:00:00|     0|
|  1|    75|2019-02-02 08:00:00|               null|     1|
|  2|   100|2019-01-01 00:00:00|2019-02-01 05:00:00|     0|
|  2|   200|2019-02-01 05:00:00|2019-02-01 13:00:00|     0|
|  2|    60|2019-02-01 13:00:00|2019-02-01 19:00:00|     0|
|  2|   500|2019-02-01 19:00:00|               null|     1|
|  3|    75|2019-01-01 06:00:00|2019-01-26 08:00:00|     0|
|  3|   750|2019-01-26 08:00:00|2019-02-01 00:00:00|     1|
|  3|   175|2019-02-01 00:00:00|               null|     1|
|  4|    50|2019-02-02 12:00:00|2019-02-02 14:00:00|     0|
|  4|   300|2019-02-02 14:00:00|               null|     1|
|  5|   500|2019-02-02 00:00:00|               null|     1|
| 10|    40|2019-01-01 00:00:00|               null|     1|
+---+------+-------------------+-------------------+------+

推荐答案

您可以从新 DataFrame 中为每个组 pk 选择第一个 startDate 并加入旧的更新所需的列.然后,您可以将所有连接结果和新的 DataFrame 合并.

You can start by selecting the first startDate for each group pk from the new DataFrame and join with the old one to update the desired columns. Then, you can union all the join result and the new DataFrame.

像这样:

// get first state by date for each pk group
val w = Window.partitionBy($"pk").orderBy($"startDate")
val updates = df_new.withColumn("rn", row_number.over(w)).filter("rn = 1").select($"pk", $"startDate")

// join with old data and update old values when there is match
val joinOldNew = df_old.join(updates.alias("new"), Seq("pk"), "left")
                       .withColumn("endDate", when($"endDate".isNull && $"active" === lit(1) && $"new.startDate".isNotNull, $"new.startDate").otherwise($"endDate"))
                       .withColumn("active", when($"endDate".isNull && $"active" === lit(1) && $"new.startDate".isNotNull, lit(0)).otherwise($"active"))
                       .drop($"new.startDate")

// union all
val result = joinOldNew.unionAll(df_new) 

这篇关于在 Spark 中实现 SCD 类型 2的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆