每次更改特定列后获取一行 [英] Get a row after each time a certain column changes

查看:96
本文介绍了每次更改特定列后获取一行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

+---------------+-------+-------+-------------------+
|ID_NOTIFICATION|CD_ETAT|TYP_MVT|DT_FIN             |
+---------------+-------+-------+-------------------+
|3111341        |AT     |C      |2019-06-12 00:03:37|
|3111341        |AN     |M      |2019-06-12 15:08:43|
|3111341        |AN     |M      |2019-06-12 15:10:11|
|3111341        |AN     |M      |2019-06-12 15:10:50|
|3111341        |AN     |M      |2019-06-12 15:11:34|
|3111341        |AN     |M      |2019-06-12 15:12:03|
|3111341        |AN     |M      |2019-06-12 15:14:04|
|3111341        |AN     |M      |2019-06-12 15:14:40|
|3111341        |AN     |M      |2019-06-12 15:15:22|
|3111341        |AN     |M      |2019-06-12 15:15:57|
|3111341        |AN     |M      |2019-06-12 15:25:28|
|3111341        |AN     |M      |2019-06-12 15:25:29|
|3111341        |AN     |M      |2019-06-12 15:27:50|
|3111341        |AN     |M      |2019-06-12 15:28:37|
|3111341        |AN     |M      |2019-06-12 15:32:22|
|3111341        |AN     |M      |2019-06-12 15:32:59|
|3111341        |EC     |M      |2019-06-12 15:33:04|
|3111341        |AN     |M      |2019-06-13 00:04:33|
|3111341        |TE     |M      |9999-01-01 00:00:00|
+---------------+-------+-------+-------------------+

在CD_ETAT上进行每次更改后,我需要从上述数据框中提取一行.

I need to extract one row from the above dataframe after each change on CD_ETAT.

此:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.partitionBy("CD_ETAT").orderBy("DT_ETAT")

df.withColumn("row_num", row_number().over(window))
  .filter($"row_num" === 1)
  .drop("row_num")

似乎可以工作,但实际上不起作用,因为它从每个CD_ETAT中仅获得一行.对于上面的示例,它给出:

seemed to work but actually not because it gets only one row from each CD_ETAT. For the above example it gives:

+---------------+-------+-------+-------------------+
|ID_NOTIFICATION|CD_ETAT|TYP_MVT|DT_FIN             |
+---------------+-------+-------+-------------------+
|3111341        |EC     |M      |2019-06-12 15:33:04|
|3111341        |AN     |M      |2019-06-13 00:04:33|
|3111341        |TE     |M      |9999-01-01 00:00:00|
|3111341        |AT     |C      |2019-06-12 00:03:37|
+---------------+-------+-------+-------------------+

但是正确的输出还应该包括输入数据帧的第二行.

But a correct Output would also include the second line of the input dataframe.

在输入的CD_ETAT进行每次更改后,我希望输出一行.

I want one row in output after each change on CD_ETAT in input.

谢谢.

推荐答案

想法:对于每一行,您都需要前一个CD_ETAT.您可以使用窗口函数或通过自联接或通过手动转换为RDD并获取该行的先前值来做到这一点.

The idea: for each line, you need the previous CD_ETAT. You may do that using a window function or by self-joining or by transforming manually to an RDD and fetching the previous value of the row.

自我加入:

val window = Window.partitionBy("ID_NOTIFICATION").orderBy("DT_ETAT")

val df2 = df.withColumn("row_num", row_number().over(window))

df2
  .join(df2, col("row_num") === col("row_num")-lit(1))
  .filter(col("etat_before") != col("etat_after"))
  .select(...)

您只需要做一点重命名就可以区分两个数据框(给定名称相同的列),但是您有了主意.

You just have to do a little bit of renaming to differentiate the two dataframes (given the columns have the same names) but you have the idea.

窗口功能:

df
  .withColumn("PREV_ETAT", lag($"CD_ETAT", 1).over(window))
  .filter(col("PREV_ETAT") != col("CD_ETAT"))
  .select("ID_NOTIFICATION", "CD_ETAT", "TYP_MVT", "DT_FIN")

具有RDD:

case class LineBefore(ID_NOTIFICATION: Int, CD_ETAT: String, TYP_MVT: String, DT_FIN: Date)
case class LineAfter(ID_NOTIFICATION: Int, CD_ETAT: String, TYP_MVT: String, DT_FIN: Date, PREV_ETAT: String)

df
  .as[LineBefore]
  .rdd
  .groupBy(_.ID_NOTIFICATION)
  .orderBy(_.DT_FIN)
  .flatMap { case (id, events) =>

     var prev_etat = null
     var etat_changed = true

     events.map { e =>
       etat_changed = prev_etat != e.CD_ETAT

       if (etat_changed)
         Some(LineAfter(e.ID_NOTIFICATION, e.CD_ETAT, e.TYP_MVT, e.DT_FIN, prev_etat)
       else
         None

       prev_etat = e.CD_ETAT
     }
   }.filter(_.isDefined).map(_.get)

希望这会有所帮助.如果不这样做,请不要犹豫,否则请接受答案.

Hope this helps. Do not hesitate to reach out if this doesn't or to accept the answer otherwise.

这篇关于每次更改特定列后获取一行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆