根据 Spark Scala 中的列条件获取最新信息不起作用 [英] Getting latest based on column condition in Spark Scala is not working

查看:17
本文介绍了根据 Spark Scala 中的列条件获取最新信息不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

数据框

+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+
|DataPartition|TimeStamp                |OrganizationID|SourceID|AuditorID|AuditorEnumerationId|AuditorOpinionCode|AuditorOpinionId|IsPlayingAuditorRole|IsPlayingCSRAuditorRole|IsPlayingTaxAdvisorRole|FFAction|!||AuditorOpinionOnInternalControlCode|AuditorOpinionOnGoingConcernCode|AuditorOpinionOnInternalControlsId|AuditorOpinionOnGoingConcernId|
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+
|Japan        |2018-05-03T09:52:48+00:00|4295876589    |194     |2719     |3023331             |AOP               |3010542         |true                |false                  |true                   |O|!|       |null                               |null                            |null                              |null                          |
|Japan        |2018-05-03T09:52:48+00:00|4295876589    |195     |16157    |1002485247          |UWE               |3010547         |true                |false                  |false                  |O|!|       |null                               |null                            |null                              |null                          |
|Japan        |2018-05-03T07:36:47+00:00|4295876589    |196     |3252     |3024053             |ONC               |3020538         |true                |false                  |true                   |O|!|       |null                               |null                            |null                              |null                          |
|Japan        |2018-05-03T07:36:47+00:00|4295876589    |195     |5937     |3026578             |NOP               |3010543         |true                |false                  |true                   |O|!|       |null                               |null                            |null                              |null                          |
|Japan        |2018-05-03T08:10:19+00:00|4295876589    |196     |null     |null                |null              |null            |null                |null                   |null                   |D|!|       |null                               |null                            |null                              |null                          |
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+

这就是我正在做的事情

val windowSpec2 = Window.partitionBy("OrganizationID", "SourceID").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
    val latestForEachKey = latestForEachKey1.withColumn("tobefiltered", first("FFAction|!|").over(windowSpec2))
      .filter($"tobefiltered" === "I|!|" || $"tobefiltered" === "O|!|" || ($"tobefiltered" === "D|!|" && $"FFAction|!|" === "D|!|"))
      .drop("tobefiltered", "TimeStamp")

输出数据框

+-------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+
|DataPartition|OrganizationID|SourceID|AuditorID|AuditorEnumerationId|AuditorOpinionCode|AuditorOpinionId|IsPlayingAuditorRole|IsPlayingCSRAuditorRole|IsPlayingTaxAdvisorRole|FFAction|!||AuditorOpinionOnInternalControlCode|AuditorOpinionOnGoingConcernCode|AuditorOpinionOnInternalControlsId|AuditorOpinionOnGoingConcernId|
+-------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+
|Japan        |4295876589    |195     |16157    |1002485247          |UWE               |3010547         |true                |false                  |false                  |O|!|       |null                               |null                            |null                              |null                          |
|Japan        |4295876589    |195     |5937     |3026578             |NOP               |3010543         |true                |false                  |true                   |O|!|       |null                               |null                            |null                              |null                          |
|Japan        |4295876589    |196     |null     |null                |null              |null            |null                |null                   |null                   |D|!|       |null                               |null                            |null                              |null                          |
|Japan        |4295876589    |194     |2719     |3023331             |AOP               |3010542         |true                |false                  |true                   |O|!|       |null                               |null                            |null                              |null                          |
+-------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+

这里我不期望两行具有相同的列 OrganizationIDSourceID

Here i am not expecting two rows for same value of Columns OrganizationID and SourceID

这是另一个例子

uniqueFundamentalSet    PeriodId    SourceId    StatementTypeCode   StatementCurrencyId UpdateReason_updateReasonId UpdateReasonComment UpdateReasonComment_languageId  UpdateReasonEnumerationId   FFAction|!| DataPartition   PartitionYear   TimeStamp
192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T09:57:29+00:00
192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 182 INC 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:11:15+00:00
192730230775    310 182 INC 500186  null    null    null    null    O|!|    Japan   2018    2018-05-10T08:30:53+00:00

这就是我正在做的

val windowSpec2 = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
    val latestForEachKey = latestForEachKey1.withColumn("tobefiltered", row_number().over(windowSpec2))
      .filter(($"FFAction|!|" === "I|!|" || $"FFAction|!|" === "O|!|" || ($"FFAction|!|" === "D|!|" && $"FFAction|!|" === "D|!|")) && $"tobefiltered" === 1)
      .drop("tobefiltered", "TimeStamp")

但是我没有得到最新的记录.

But I am not getting the latest record .

我得到了这个

192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00

但最新的是有这个时间戳的记录2018-05-10T10:11:15+00:00

But the latest is record that has this time stamp 2018-05-10T10:11:15+00:00

所以最终的输出应该是

192730230775    297 182 INC 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:11:15+00:00

推荐答案

first一个聚合函数,但是因为你已经将它window 一起使用 函数,聚合的输出再次应用到每一行.

first is an aggregation function but since you have used it with window funtion, the output of aggregation is applied to every row again.

您只需要 row_number() 内置函数,与 window 函数一起使用,然后将 filter 用作

All you need is row_number() inbuilt function to be used with window function and to filter later on as

val windowSpec2 = Window.partitionBy("OrganizationID", "SourceID").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey = latestForEachKey1.withColumn("tobefiltered", row_number().over(windowSpec2))
  .filter(($"FFAction|!|" === "I|!|" || $"FFAction|!|" === "O|!|" || ($"FFAction|!|" === "D|!|" && $"FFAction|!|" === "D|!|")) && $"tobefiltered" === 1)
  .drop("tobefiltered", "TimeStamp")

latestForEachKey.show(false)

应该给你

+-------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+
|DataPartition|OrganizationID|SourceID|AuditorID|AuditorEnumerationId|AuditorOpinionCode|AuditorOpinionId|IsPlayingAuditorRole|IsPlayingCSRAuditorRole|IsPlayingTaxAdvisorRole|FFAction|!||AuditorOpinionOnInternalControlCode|AuditorOpinionOnGoingConcernCode|AuditorOpinionOnInternalControlsId|AuditorOpinionOnGoingConcernId|
+-------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+
|Japan        |4295876589    |195     |16157    |1002485247          |UWE               |3010547         |true                |false                  |false                  |O|!|       |null                               |null                            |null                              |null                          |
|Japan        |4295876589    |196     |null     |null                |null              |null            |null                |null                   |null                   |D|!|       |null                               |null                            |null                              |null                          |
|Japan        |4295876589    |194     |2719     |3023331             |AOP               |3010542         |true                |false                  |true                   |O|!|       |null                               |null                            |null                              |null                          |
+-------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+

这篇关于根据 Spark Scala 中的列条件获取最新信息不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆