无法基于Spark Scala中的列条件获取最新信息 [英] Getting latest based on column condition in Spark Scala is not working
本文介绍了无法基于Spark Scala中的列条件获取最新信息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
数据框
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+
|DataPartition|TimeStamp |OrganizationID|SourceID|AuditorID|AuditorEnumerationId|AuditorOpinionCode|AuditorOpinionId|IsPlayingAuditorRole|IsPlayingCSRAuditorRole|IsPlayingTaxAdvisorRole|FFAction|!||AuditorOpinionOnInternalControlCode|AuditorOpinionOnGoingConcernCode|AuditorOpinionOnInternalControlsId|AuditorOpinionOnGoingConcernId|
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+
|Japan |2018-05-03T09:52:48+00:00|4295876589 |194 |2719 |3023331 |AOP |3010542 |true |false |true |O|!| |null |null |null |null |
|Japan |2018-05-03T09:52:48+00:00|4295876589 |195 |16157 |1002485247 |UWE |3010547 |true |false |false |O|!| |null |null |null |null |
|Japan |2018-05-03T07:36:47+00:00|4295876589 |196 |3252 |3024053 |ONC |3020538 |true |false |true |O|!| |null |null |null |null |
|Japan |2018-05-03T07:36:47+00:00|4295876589 |195 |5937 |3026578 |NOP |3010543 |true |false |true |O|!| |null |null |null |null |
|Japan |2018-05-03T08:10:19+00:00|4295876589 |196 |null |null |null |null |null |null |null |D|!| |null |null |null |null |
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+
这就是我要做的
val windowSpec2 = Window.partitionBy("OrganizationID", "SourceID").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey = latestForEachKey1.withColumn("tobefiltered", first("FFAction|!|").over(windowSpec2))
.filter($"tobefiltered" === "I|!|" || $"tobefiltered" === "O|!|" || ($"tobefiltered" === "D|!|" && $"FFAction|!|" === "D|!|"))
.drop("tobefiltered", "TimeStamp")
输出数据帧
+-------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+
|DataPartition|OrganizationID|SourceID|AuditorID|AuditorEnumerationId|AuditorOpinionCode|AuditorOpinionId|IsPlayingAuditorRole|IsPlayingCSRAuditorRole|IsPlayingTaxAdvisorRole|FFAction|!||AuditorOpinionOnInternalControlCode|AuditorOpinionOnGoingConcernCode|AuditorOpinionOnInternalControlsId|AuditorOpinionOnGoingConcernId|
+-------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+
|Japan |4295876589 |195 |16157 |1002485247 |UWE |3010547 |true |false |false |O|!| |null |null |null |null |
|Japan |4295876589 |195 |5937 |3026578 |NOP |3010543 |true |false |true |O|!| |null |null |null |null |
|Japan |4295876589 |196 |null |null |null |null |null |null |null |D|!| |null |null |null |null |
|Japan |4295876589 |194 |2719 |3023331 |AOP |3010542 |true |false |true |O|!| |null |null |null |null |
+-------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+
这里我不希望列OrganizationID
和SourceID
这是另一个示例
uniqueFundamentalSet PeriodId SourceId StatementTypeCode StatementCurrencyId UpdateReason_updateReasonId UpdateReasonComment UpdateReasonComment_languageId UpdateReasonEnumerationId FFAction|!| DataPartition PartitionYear TimeStamp
192730230775 297 182 INC 500186 6 UpdateReasonToDelete 505074 3019685 I|!| Japan 2017 2018-05-10T09:57:29+00:00
192730230775 297 182 INC 500186 6 UpdateReasonToDelete 505074 3019685 I|!| Japan 2017 2018-05-10T10:00:40+00:00
192730230775 297 182 INC 500186 null null null null O|!| Japan 2017 2018-05-10T10:11:15+00:00
192730230775 310 182 INC 500186 null null null null O|!| Japan 2018 2018-05-10T08:30:53+00:00
这就是我正在做的
val windowSpec2 = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey = latestForEachKey1.withColumn("tobefiltered", row_number().over(windowSpec2))
.filter(($"FFAction|!|" === "I|!|" || $"FFAction|!|" === "O|!|" || ($"FFAction|!|" === "D|!|" && $"FFAction|!|" === "D|!|")) && $"tobefiltered" === 1)
.drop("tobefiltered", "TimeStamp")
但是我没有得到最新记录.
But I am not getting the latest record .
我得到了
192730230775 297 182 INC 500186 6 UpdateReasonToDelete 505074 3019685 I|!| Japan 2017 2018-05-10T10:00:40+00:00
但是最新的记录是带有该时间戳记2018-05-10T10:11:15+00:00
But the latest is record that has this time stamp 2018-05-10T10:11:15+00:00
因此最终输出应为
192730230775 297 182 INC 500186 null null null null O|!| Japan 2017 2018-05-10T10:11:15+00:00
推荐答案
first
是聚合函数,但是由于您已将与window
功能一起使用,聚合的输出再次应用于每一行.
first
is an aggregation function but since you have used it with window
funtion, the output of aggregation is applied to every row again.
所有您需要的是row_number()
内置函数,该函数将与window
函数一起使用,并在filter
之后用作
All you need is row_number()
inbuilt function to be used with window
function and to filter
later on as
val windowSpec2 = Window.partitionBy("OrganizationID", "SourceID").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey = latestForEachKey1.withColumn("tobefiltered", row_number().over(windowSpec2))
.filter(($"FFAction|!|" === "I|!|" || $"FFAction|!|" === "O|!|" || ($"FFAction|!|" === "D|!|" && $"FFAction|!|" === "D|!|")) && $"tobefiltered" === 1)
.drop("tobefiltered", "TimeStamp")
latestForEachKey.show(false)
应该给您
+-------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+
|DataPartition|OrganizationID|SourceID|AuditorID|AuditorEnumerationId|AuditorOpinionCode|AuditorOpinionId|IsPlayingAuditorRole|IsPlayingCSRAuditorRole|IsPlayingTaxAdvisorRole|FFAction|!||AuditorOpinionOnInternalControlCode|AuditorOpinionOnGoingConcernCode|AuditorOpinionOnInternalControlsId|AuditorOpinionOnGoingConcernId|
+-------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+
|Japan |4295876589 |195 |16157 |1002485247 |UWE |3010547 |true |false |false |O|!| |null |null |null |null |
|Japan |4295876589 |196 |null |null |null |null |null |null |null |D|!| |null |null |null |null |
|Japan |4295876589 |194 |2719 |3023331 |AOP |3010542 |true |false |true |O|!| |null |null |null |null |
+-------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+
这篇关于无法基于Spark Scala中的列条件获取最新信息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文