在Spark中实施SCD Type 2 [英] Implement SCD Type 2 in Spark

查看:139
本文介绍了在Spark中实施SCD Type 2的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

试图在Spark 2.4.4中实现SCD Type 2逻辑.我有两个数据框;一个包含现有数据",另一个包含新传入数据".

Trying to implement SCD Type 2 logic in Spark 2.4.4. I've two Data Frames; one containing 'Existing Data' and the other containing 'New Incoming Data'.

输入和预期输出如下.需要发生的是:

Input and expected output are given below. What needs to happen is:

  1. 所有传入的行都应附加到现有数据之后.

  1. All incoming rows should get appended to the existing data.

仅以下三个之前处于活动"状态的行应变为非活动状态,并填充以下适当的"endDate":

Only following 3 rows which were previously 'active' should become inactive with appropriate 'endDate' populated as follows:

pk = 1,数量= 20 =>行应变为非活动"&'endDate'是下一行(Lead)的'startDate'

pk=1, amount = 20 => Row should become 'inactive' & 'endDate' is the 'startDate' of following row (Lead)

pk = 2,数量= 100 =>行应变为非活动"&'endDate'是下一行(Lead)的'startDate'

pk=2, amount = 100 => Row should become 'inactive' & 'endDate' is the 'startDate' of following row (Lead)

pk = 3,数量= 750 =>行应变为非活动"&'endDate'是下一行(Lead)的'startDate'

pk=3, amount = 750 => Row should become 'inactive' & 'endDate' is the 'startDate' of following row (Lead)

如何在Spark中执行此操作?

How do I do this in Spark?

现有数据:

+---+------+-------------------+-------------------+------+
| pk|amount|          startDate|            endDate|active|
+---+------+-------------------+-------------------+------+
|  1|    10|2019-01-01 12:00:00|2019-01-20 05:00:00|     0|
|  1|    20|2019-01-20 05:00:00|               null|     1|
|  2|   100|2019-01-01 00:00:00|               null|     1|
|  3|    75|2019-01-01 06:00:00|2019-01-26 08:00:00|     0|
|  3|   750|2019-01-26 08:00:00|               null|     1|
| 10|    40|2019-01-01 00:00:00|               null|     1|
+---+------+-------------------+-------------------+------+

新传入数据:

+---+------+-------------------+-------------------+------+
| pk|amount|          startDate|            endDate|active|
+---+------+-------------------+-------------------+------+
|  1|    50|2019-02-01 07:00:00|2019-02-02 08:00:00|     0|
|  1|    75|2019-02-02 08:00:00|               null|     1|
|  2|   200|2019-02-01 05:00:00|2019-02-01 13:00:00|     0|
|  2|    60|2019-02-01 13:00:00|2019-02-01 19:00:00|     0|
|  2|   500|2019-02-01 19:00:00|               null|     1|
|  3|   175|2019-02-01 00:00:00|               null|     1|
|  4|    50|2019-02-02 12:00:00|2019-02-02 14:00:00|     0|
|  4|   300|2019-02-02 14:00:00|               null|     1|
|  5|   500|2019-02-02 00:00:00|               null|     1|
+---+------+-------------------+-------------------+------+

预期输出:

+---+------+-------------------+-------------------+------+
| pk|amount|          startDate|            endDate|active|
+---+------+-------------------+-------------------+------+
|  1|    10|2019-01-01 12:00:00|2019-01-20 05:00:00|     0|
|  1|    20|2019-01-20 05:00:00|2019-02-01 07:00:00|     0|
|  1|    50|2019-02-01 07:00:00|2019-02-02 08:00:00|     0|
|  1|    75|2019-02-02 08:00:00|               null|     1|
|  2|   100|2019-01-01 00:00:00|2019-02-01 05:00:00|     0|
|  2|   200|2019-02-01 05:00:00|2019-02-01 13:00:00|     0|
|  2|    60|2019-02-01 13:00:00|2019-02-01 19:00:00|     0|
|  2|   500|2019-02-01 19:00:00|               null|     1|
|  3|    75|2019-01-01 06:00:00|2019-01-26 08:00:00|     0|
|  3|   750|2019-01-26 08:00:00|2019-02-01 00:00:00|     1|
|  3|   175|2019-02-01 00:00:00|               null|     1|
|  4|    50|2019-02-02 12:00:00|2019-02-02 14:00:00|     0|
|  4|   300|2019-02-02 14:00:00|               null|     1|
|  5|   500|2019-02-02 00:00:00|               null|     1|
| 10|    40|2019-01-01 00:00:00|               null|     1|
+---+------+-------------------+-------------------+------+

推荐答案

您可以从新的DataFrame中为每个组 pk 选择第一个 startDate ,然后加入旧的更新所需的列.然后,您可以合并所有联接结果和新的DataFrame.

You can start by selecting the first startDate for each group pk from the new DataFrame and join with the old one to update the desired columns. Then, you can union all the join result and the new DataFrame.

类似这样的东西:

// get first state by date for each pk group
val w = Window.partitionBy($"pk").orderBy($"startDate")
val updates = df_new.withColumn("rn", row_number.over(w)).filter("rn = 1").select($"pk", $"startDate")

// join with old data and update old values when there is match
val joinOldNew = df_old.join(updates.alias("new"), Seq("pk"), "left")
                       .withColumn("endDate", when($"endDate".isNull && $"active" === lit(1) && $"new.startDate".isNotNull, $"new.startDate").otherwise($"endDate"))
                       .withColumn("active", when($"endDate".isNull && $"active" === lit(1) && $"new.startDate".isNotNull, lit(0)).otherwise($"active"))
                       .drop($"new.startDate")

// union all
val result = joinOldNew.unionAll(df_new) 

这篇关于在Spark中实施SCD Type 2的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆