如何在Spark结构化流中基于时间戳字段进行重复数据删除并保持最新状态? [英] How to deduplicate and keep latest based on timestamp field in spark structured streaming?

查看:68
本文介绍了如何在Spark结构化流中基于时间戳字段进行重复数据删除并保持最新状态?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Spark dropDuplicates 保留第一个实例,并忽略该键的所有后续出现.是否可以在保持最新发生的同时删除重复项?

Spark dropDuplicates keeps the first instance and ignores all subsequent occurrences for that key. Is it possible to do remove duplicates while keeping the most recent occurrence?

例如,如果下面是我得到的微型批次,那么我想保留每个国家的最新记录(按时间戳记字段排序).

For example if below are the micro batches that I get, then I want to keep the most recent record (sorted on timestamp field) for each country.

批处理ID:0

Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06

批处理ID:1

Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:03

然后,batchId 1之后的输出应低于-

Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:06

更新1 这是我当前拥有的代码

Update-1 This is the current code that I have

//KafkaDF is a streaming dataframe created from Kafka as source
val streamingDF = kafkaDF.dropDuplicates("country")

streamingDF.writeStream
    .trigger(Trigger.ProcessingTime(10000L))
    .outputMode("update")
    .foreachBatch {
      (batchDF: DataFrame, batchId: Long) => {
        println("batchId: "+ batchId)
        batchDF.show()
      }
    }.start()

我想输出所有新的或具有比到目前为止已处理的先前批处理中的任何记录都更大的时间戳的行.下面的例子

I want to output all rows which are either new or have greater timestamp than any record in previous batches processed so far. Example below

batchId:0 之后-两个国家都首次出现,所以我应该将它们输入输出中

After batchId: 0 - Both countries appeared for first time so I should get them in output

Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06

batchId:1 之后-白俄罗斯的时间戳早于我在批次0中收到的时间戳,因此我不会在输出中显示该时间戳.显示澳大利亚是因为它的时间戳比我到目前为止所看到的要晚.

After batchId: 1 - Belarus's timestamp is older than we I received in batch 0 so I don't display that in output. Australia is displayed as its timestamp is more recent than what I have seen so far.

Australia, 10, 2020-05-05 00:00:08

现在让我们说,batchId 2会将这两个记录都作为延迟到达来提供,因此它不应在该批次的输出中显示任何内容.

Now let's say batchId 2 comes up with both records as late arrival then it should not display anything in ouput for that batch.

输入批次ID:2

Australia, 10, 2020-05-05 00:00:01
Belarus, 10, 2020-05-05 00:00:01

batchId之后:2

.

更新2

为每个批次添加输入和预期记录.标有红色的行将被丢弃,并且不会在输出中显示为具有相同国家/地区名称的另一行,并且在以前的批次中可以看到最新的时间戳记

Adding input and expected records for each batch. Rows marked with red color are discarded and not shown in output as an another row with same country name and more recent timestamp is seen in previous batches

推荐答案

为了避免流式应用程序中迟到的事件,您需要在应用程序中保留一个状态,以便在您遇到的情况下跟踪每个键的最新处理事件.是国家.

In order to avoid late arriving events in streaming app you need to keep a state in your application, that keeps track of latest processed event per key in your case it is country.

case class AppState(country:String, latestTs:java.sql.Timestamp)

对于微批量处理,您可能会收到多个事件,当您执行 groupByKey(_.country)时,您将获得一个属于 key(country)的事件,并且您需要将其与状态进行比较,以找到最新的输入事件,并使用键的最新时间戳更新状态,并继续进行最新事件以进行进一步处理.对于迟到的事件,它应返回 Option [Event] ,并在后续过程中将其过滤掉.

For a microbatch, you might receive multiple events on that when you do groupByKey(_.country) you will get a events belong to a key(country) and you need to compare against it with the state to find the latest input event and update the state with the latest timestamp for the key and proceed with the latest event for further processing. For late arriving events, it should return an Option[Event] and filter out the same in subsequent process.

请参见博客进行详细说明.

这篇关于如何在Spark结构化流中基于时间戳字段进行重复数据删除并保持最新状态?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆