在 Spark scala 中,如何在数据帧中的相邻行之间进行检查 [英] In Spark scala, how to check between adjacent rows in a dataframe
本文介绍了在 Spark scala 中,如何在数据帧中的相邻行之间进行检查的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
如何检查 Dataframe
中相邻行(前一行和后一行)的日期.这应该发生在关键层面
How can I check for the dates from the adjacent rows (preceding and next) in a Dataframe
. This should happen at a key level
按键、日期排序后我有以下数据
I have following data after sorting on key, dates
source_Df.show()
+-----+--------+------------+------------+
| key | code | begin_dt | end_dt |
+-----+--------+------------+------------+
| 10 | ABC | 2018-01-01 | 2018-01-08 |
| 10 | BAC | 2018-01-03 | 2018-01-15 |
| 10 | CAS | 2018-01-03 | 2018-01-21 |
| 20 | AAA | 2017-11-12 | 2018-01-03 |
| 20 | DAS | 2018-01-01 | 2018-01-12 |
| 20 | EDS | 2018-02-01 | 2018-02-16 |
+-----+--------+------------+------------+
当日期在这些行的范围内时(即当前行 begin_dt
在前一行的开始和结束日期之间),我需要在所有这些行中使用最低的开始日期行和最高结束日期.这是我需要的输出..
When the dates are in a range from these rows (i.e. the current row begin_dt
falls in between begin and end dates of the previous row), I need to have the lowest begin date on all such rows and the highest end date.
Here is the output I need..
final_Df.show()
+-----+--------+------------+------------+
| key | code | begin_dt | end_dt |
+-----+--------+------------+------------+
| 10 | ABC | 2018-01-01 | 2018-01-21 |
| 10 | BAC | 2018-01-01 | 2018-01-21 |
| 10 | CAS | 2018-01-01 | 2018-01-21 |
| 20 | AAA | 2017-11-12 | 2018-01-12 |
| 20 | DAS | 2017-11-12 | 2018-01-12 |
| 20 | EDS | 2018-02-01 | 2018-02-16 |
+-----+--------+------------+------------+
感谢任何实现这一目标的想法.提前致谢!
Appreciate any ideas to achieve this. Thanks in advance!
推荐答案
这是一种方法:
- 如果
begin_dt
在上一行的日期范围内,则创建具有null
值的新列group_id
;否则为唯一整数 - 用
last
非空值回填group_id
中的null
- 计算每个(
key, group_id)
分区内的min(begin_dt)
和max(end_dt)
- Create new column
group_id
withnull
value ifbegin_dt
is within date range from the previous row; otherwise a unique integer - Backfill
null
s ingroup_id
with thelast
non-null value - Compute
min(begin_dt)
andmax(end_dt)
within each (key, group_id)
partition
示例如下:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df = Seq(
(10, "ABC", "2018-01-01", "2018-01-08"),
(10, "BAC", "2018-01-03", "2018-01-15"),
(10, "CAS", "2018-01-03", "2018-01-21"),
(20, "AAA", "2017-11-12", "2018-01-03"),
(20, "DAS", "2018-01-01", "2018-01-12"),
(20, "EDS", "2018-02-01", "2018-02-16")
).toDF("key", "code", "begin_dt", "end_dt")
val win1 = Window.partitionBy($"key").orderBy($"begin_dt", $"end_dt")
val win2 = Window.partitionBy($"key", $"group_id")
df.
withColumn("group_id", when(
$"begin_dt".between(lag($"begin_dt", 1).over(win1), lag($"end_dt", 1).over(win1)), null
).otherwise(monotonically_increasing_id)
).
withColumn("group_id", last($"group_id", ignoreNulls=true).
over(win1.rowsBetween(Window.unboundedPreceding, 0))
).
withColumn("begin_dt2", min($"begin_dt").over(win2)).
withColumn("end_dt2", max($"end_dt").over(win2)).
orderBy("key", "begin_dt", "end_dt").
show
// +---+----+----------+----------+-------------+----------+----------+
// |key|code| begin_dt| end_dt| group_id| begin_dt2| end_dt2|
// +---+----+----------+----------+-------------+----------+----------+
// | 10| ABC|2018-01-01|2018-01-08|1047972020224|2018-01-01|2018-01-21|
// | 10| BAC|2018-01-03|2018-01-15|1047972020224|2018-01-01|2018-01-21|
// | 10| CAS|2018-01-03|2018-01-21|1047972020224|2018-01-01|2018-01-21|
// | 20| AAA|2017-11-12|2018-01-03| 455266533376|2017-11-12|2018-01-12|
// | 20| DAS|2018-01-01|2018-01-12| 455266533376|2017-11-12|2018-01-12|
// | 20| EDS|2018-02-01|2018-02-16| 455266533377|2018-02-01|2018-02-16|
// +---+----+----------+----------+-------------+----------+----------+
这篇关于在 Spark scala 中,如何在数据帧中的相邻行之间进行检查的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文