使用Scala获取与Spark数据集中的最新时间戳相对应的行 [英] Get the row corresponding to the latest timestamp in a Spark Dataset using Scala

查看:189
本文介绍了使用Scala获取与Spark数据集中的最新时间戳相对应的行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对Spark和Scala相对较新.我有一个具有以下格式的数据框:

I am relatively new to Spark and Scala. I have a dataframe which has the following format:

| Col1 | Col2 | Col3 | Col_4 | Col_5 | Col_TS                  | Col_7 | 

| 1234 | AAAA | 1111 | afsdf | ewqre | 1970-01-01 00:00:00.0   | false |
| 1234 | AAAA | 1111 | ewqrw | dafda | 2017-01-17 07:09:32.748 | true  |
| 1234 | AAAA | 1111 | dafsd | afwew | 2015-01-17 07:09:32.748 | false |
| 5678 | BBBB | 2222 | afsdf | qwerq | 1970-01-01 00:00:00.0   | true  |
| 5678 | BBBB | 2222 | bafva | qweqe | 2016-12-08 07:58:43.04  | false |
| 9101 | CCCC | 3333 | caxad | fsdaa | 1970-01-01 00:00:00.0   | false |

我需要做的是获取与最新时间戳相对应的行. 在上面的示例中,键为Col1,Col2和Col3. Col_TS表示时间戳,而Col_7是确定记录有效性的布尔值. 我要做的是找到一种方法,可以根据键对这些记录进行分组,并保留具有最新时间戳的记录.

What I need to do is to get the row that corresponds to the latest timestamp. In the example above, the keys are Col1, Col2 and Col3. Col_TS represents the timestamp and Col_7 is a boolean that determines the validity of the record. What I want to do is to find a way to group these records based on the keys and retain the one that has the latest timestamp.

因此,上面数据框中的操作输出应为:

So the output of the operation in the dataframe above should be:

| Col1 | Col2 | Col3 | Col_4 | Col_5 | Col_TS                  | Col_7 | 

| 1234 | AAAA | 1111 | ewqrw | dafda | 2017-01-17 07:09:32.748 | true  |
| 5678 | BBBB | 2222 | bafva | qweqe | 2016-12-08 07:58:43.04  | false |
| 9101 | CCCC | 3333 | caxad | fsdaa | 1970-01-01 00:00:00.0   | false |

我提出了部分解决方案,但是通过这种方式,我只能返回对记录进行分组的Column键的数据框,而不能返回其他列.

I came up with a partial solution but this way I can only return the dataframe of the Column keys on which the records are grouped and not the other columns.

df = df.groupBy("Col1","Col2","Col3").agg(max("Col_TS"))


| Col1 | Col2 | Col3 | max(Col_TS)             |

| 1234 | AAAA | 1111 | 2017-01-17 07:09:32.748 |
| 5678 | BBBB | 2222 | 2016-12-08 07:58:43.04  | 
| 9101 | CCCC | 3333 | 1970-01-01 00:00:00.0   | 

有人可以帮助我提出执行此操作的Scala代码吗?

Can someone help me in coming up with a Scala code for performing this operation?

推荐答案

您可以按以下方式使用window函数

You can use window function as following

import org.apache.spark.sql.functions._
val windowSpec = Window.partitionBy("Col1","Col2","Col3").orderBy(col("Col_TS").desc)

df.withColumn("maxTS", first("Col_TS").over(windowSpec))
.select("*").where(col("maxTS") === col("Col_TS"))
.drop("maxTS")
  .show(false)

您应该获得如下输出

+----+----+----+-----+-----+----------------------+-----+
|Col1|Col2|Col3|Col_4|Col_5|Col_TS                |Col_7|
+----+----+----+-----+-----+----------------------+-----+
|5678|BBBB|2222|bafva|qweqe|2016-12-0807:58:43.04 |false|
|1234|AAAA|1111|ewqrw|dafda|2017-01-1707:09:32.748|true |
|9101|CCCC|3333|caxad|fsdaa|1970-01-0100:00:00.0  |false|
+----+----+----+-----+-----+----------------------+-----+

这篇关于使用Scala获取与Spark数据集中的最新时间戳相对应的行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆