火花流:为数据帧中的每个 id 选择具有最大时间戳的记录(pyspark) [英] spark streaming: select record with max timestamp for each id in dataframe (pyspark)

查看:24
本文介绍了火花流:为数据帧中的每个 id 选择具有最大时间戳的记录(pyspark)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个带架构的数据框 -

I have a dataframe with schema -

 |-- record_id: integer (nullable = true)
 |-- Data1: string (nullable = true)
 |-- Data2: string (nullable = true)
 |-- Data3: string (nullable = true)
 |-- Time: timestamp (nullable = true)

我想检索数据中的最后一条记录,按 record_id 和最大时间戳分组.

I wanted to retrieve the last record in the data, grouping by the record_id and with the greatest timestamp.

所以,如果数据最初是这样的:

So,if the data is initially this:

 +----------+---------+---------+---------+-----------------------+
 |record_id |Data1    |Data2    |Data3    |                   Time|
 +----------+---------+-------------------------------------------+
 |        1 | aaa     | null    |  null   | 2018-06-04 21:51:53.0 |
 |        1 | null    | bbbb    |  cccc   | 2018-06-05 21:51:53.0 |
 |        1 | aaa     | null    |  dddd   | 2018-06-06 21:51:53.0 |
 |        1 | qqqq    | wwww    |  eeee   | 2018-06-07 21:51:53.0 |
 |        2 | aaa     | null    |  null   | 2018-06-04 21:51:53.0 |
 |        2 | aaaa    | bbbb    |  cccc   | 2018-06-05 21:51:53.0 |
 |        3 | aaa     | null    |  dddd   | 2018-06-06 21:51:53.0 |
 |        3 | aaaa    | bbbb    |  eeee   | 2018-06-08 21:51:53.0 |

我希望输出是

 +----------+---------+---------+---------+-----------------------+
 |record_id |Data1    |Data2    |Data3    |                   Time|
 +----------+---------+-------------------------------------------+
 |        1 | qqqq    | wwww    |  eeee   | 2018-06-07 21:51:53.0 |
 |        2 | aaaa    | bbbb    |  cccc   | 2018-06-05 21:51:53.0 |
 |        3 | aaaa    | bbbb    |  eeee   | 2018-06-08 21:51:53.0 |

我尝试在同一个流上加入 2 个查询,类似于答案 这里.我的代码(其中 df1 是原始数据帧):

I tried to join 2 queries on the same stream, similar to the answer here. My code (where df1 is the original dataframe) :

df1=df1.withWatermark("Timetemp", "2 seconds")
df1.createOrReplaceTempView("tbl")
df1.printSchema()
query="select t.record_id as record_id, max(t.Timetemp) as Timetemp from tbl t group by t.record_id"
df2=spark.sql(query)
df2=df2.withWatermark("Timetemp", "2 seconds")

qws=df1.alias('a').join(df2.alias('b'),((col('a.record_id')==col('b.record_id')) & (col("a.Timetemp")==col("b.Timetemp"))))

query = qws.writeStream.outputMode('append').format('console').start()

query.awaitTermination()  

我一直收到这个错误:

在没有水印的流数据帧/数据集上有流聚合时不支持附加输出模式;

Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;

当有明显的水印时.可以做什么?我无法使用窗口,因为流媒体不支持非基于时间的窗口.

When there is clearly a watermark. What can be done ? I cannot use windowing since non time based windowing is not supported in streaming.

推荐答案

我也有同样的任务.尝试了几个选项,将 current_timestamp 列添加到数据集,并按窗口和带水印的记录 ID 分组,但没有任何效果.

I had also the same task. Tried several options adding current_timestamp column to dataset and them grouping by window and record ID with watermarking, but nothing worked.

据我所知,没有可用的 API 来解决这个任务.具有分区依据和排序的窗口不适用于流式数据集.

There are no API available to solve this task, as far as I found. Window with partitioned by and ordering does not work on streaming datasets.

我使用 MapGroupWithState API 解决了这个任务,但没有保持如下状态:

I solve this task using MapGroupWithState API, but without keeping state as following:

import spark.implicits._

val stream = spark.readStream
  .option("maxFileAge", "24h")
  .option("maxFilesPerTrigger", "1000")
  .parquet(sourcePath)
  .as[input.Data]

val mostRecentRowPerPrimaryKey =
  stream
    .groupByKey(_.id)
    .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(takeMostRecentOnly)

mostRecentRowPerPrimaryKey
  .repartition(5)
  .writeStream
  .option("checkpointLocation", s"${streamingConfig.checkpointBasePath}/$streamName")
  .option("truncate", "false")
  .format("console")
  .outputMode(OutputMode.Update())
  .trigger(Trigger.ProcessingTime(60.seconds))
  .queryName(streamName)
  .start()

def takeMostRecentOnly(pk: Long, values: Iterator[input.Data], state: GroupState[input.Data]): input.Data = {
  values.maxBy(_.last_modified)
}

注意:这仅适用于 Update 模式.

NOTE: that's only work in Update mode.

希望有帮助!

这篇关于火花流:为数据帧中的每个 id 选择具有最大时间戳的记录(pyspark)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆