每次特定列更改后获取一行 [英] Get a row after each time a certain column changes
问题描述
+---------------+-------+-------+-------------------+
|ID_NOTIFICATION|CD_ETAT|TYP_MVT|DT_FIN |
+---------------+-------+-------+-------------------+
|3111341 |AT |C |2019-06-12 00:03:37|
|3111341 |AN |M |2019-06-12 15:08:43|
|3111341 |AN |M |2019-06-12 15:10:11|
|3111341 |AN |M |2019-06-12 15:10:50|
|3111341 |AN |M |2019-06-12 15:11:34|
|3111341 |AN |M |2019-06-12 15:12:03|
|3111341 |AN |M |2019-06-12 15:14:04|
|3111341 |AN |M |2019-06-12 15:14:40|
|3111341 |AN |M |2019-06-12 15:15:22|
|3111341 |AN |M |2019-06-12 15:15:57|
|3111341 |AN |M |2019-06-12 15:25:28|
|3111341 |AN |M |2019-06-12 15:25:29|
|3111341 |AN |M |2019-06-12 15:27:50|
|3111341 |AN |M |2019-06-12 15:28:37|
|3111341 |AN |M |2019-06-12 15:32:22|
|3111341 |AN |M |2019-06-12 15:32:59|
|3111341 |EC |M |2019-06-12 15:33:04|
|3111341 |AN |M |2019-06-13 00:04:33|
|3111341 |TE |M |9999-01-01 00:00:00|
+---------------+-------+-------+-------------------+
每次更改 CD_ETAT 后,我都需要从上述数据框中提取一行.
I need to extract one row from the above dataframe after each change on CD_ETAT.
这个:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val window = Window.partitionBy("CD_ETAT").orderBy("DT_ETAT")
df.withColumn("row_num", row_number().over(window))
.filter($"row_num" === 1)
.drop("row_num")
似乎有效但实际上无效,因为它只从每个 CD_ETAT 中获取一行.对于上面的例子,它给出:
seemed to work but actually not because it gets only one row from each CD_ETAT. For the above example it gives:
+---------------+-------+-------+-------------------+
|ID_NOTIFICATION|CD_ETAT|TYP_MVT|DT_FIN |
+---------------+-------+-------+-------------------+
|3111341 |EC |M |2019-06-12 15:33:04|
|3111341 |AN |M |2019-06-13 00:04:33|
|3111341 |TE |M |9999-01-01 00:00:00|
|3111341 |AT |C |2019-06-12 00:03:37|
+---------------+-------+-------+-------------------+
但是正确的输出还包括输入数据帧的第二行.
But a correct Output would also include the second line of the input dataframe.
每次更改输入中的 CD_ETAT 后,我都希望输出中有一行.
I want one row in output after each change on CD_ETAT in input.
谢谢.
推荐答案
想法:对于每一行,您都需要之前的 CD_ETAT
.您可以使用窗口函数或通过自连接或通过手动转换为 RDD 并获取该行的前一个值来实现.
The idea: for each line, you need the previous CD_ETAT
. You may do that using a window function or by self-joining or by transforming manually to an RDD and fetching the previous value of the row.
自加入:
val window = Window.partitionBy("ID_NOTIFICATION").orderBy("DT_ETAT")
val df2 = df.withColumn("row_num", row_number().over(window))
df2
.join(df2, col("row_num") === col("row_num")-lit(1))
.filter(col("etat_before") != col("etat_after"))
.select(...)
您只需要稍微重命名即可区分两个数据框(假设列具有相同的名称),但您有想法.
You just have to do a little bit of renaming to differentiate the two dataframes (given the columns have the same names) but you have the idea.
窗口函数:
df
.withColumn("PREV_ETAT", lag($"CD_ETAT", 1).over(window))
.filter(col("PREV_ETAT") != col("CD_ETAT"))
.select("ID_NOTIFICATION", "CD_ETAT", "TYP_MVT", "DT_FIN")
使用 RDD:
case class LineBefore(ID_NOTIFICATION: Int, CD_ETAT: String, TYP_MVT: String, DT_FIN: Date)
case class LineAfter(ID_NOTIFICATION: Int, CD_ETAT: String, TYP_MVT: String, DT_FIN: Date, PREV_ETAT: String)
df
.as[LineBefore]
.rdd
.groupBy(_.ID_NOTIFICATION)
.orderBy(_.DT_FIN)
.flatMap { case (id, events) =>
var prev_etat = null
var etat_changed = true
events.map { e =>
etat_changed = prev_etat != e.CD_ETAT
if (etat_changed)
Some(LineAfter(e.ID_NOTIFICATION, e.CD_ETAT, e.TYP_MVT, e.DT_FIN, prev_etat)
else
None
prev_etat = e.CD_ETAT
}
}.filter(_.isDefined).map(_.get)
希望这会有所帮助.如果没有,请随时与我们联系,否则请接受答案.
Hope this helps. Do not hesitate to reach out if this doesn't or to accept the answer otherwise.
这篇关于每次特定列更改后获取一行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!