为什么Flink在数据流连接+全局窗口上发出重复记录? [英] Why does Flink emit duplicate records on a DataStream join + Global window?
本文介绍了为什么Flink在数据流连接+全局窗口上发出重复记录?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在学习/试验Flink,我观察到数据流联接的一些意外行为,希望了解发生的情况.
假设我有两个流,每个流有10条记录,我希望在id
字段上加入这两个流。让我们假设一个流中的每个记录在另一个流中都有一个匹配的记录,并且ID在每个流中都是唯一的。我们还假设我必须使用全局窗口(要求)。
使用数据流API连接(我在Scala中的简化代码):
val stream1 = ... // from a Kafka topic on my local machine (I tried with and without .keyBy)
val stream2 = ...
stream1
.join(stream2)
.where(_.id).equalTo(_.id)
.window(GlobalWindows.create()) // assume this is a requirement
.trigger(CountTrigger.of(1))
.apply {
(row1, row2) => // ...
}
.print()
结果:
- 所有内容都按预期打印,第一个流中的每条记录与第二个流中的一条记录联接。
但是:
- 如果我从其中一个流向该流重新发送其中一个记录(例如,具有更新的字段),则会发出两个重复的联接事件😞
- 如果我重复该操作(使用或不使用更新的字段),我将得到3个发出的事件,然后是4个、5个,依此类推……😞
Flink社区的人能解释一下为什么会发生这种情况吗?我原本预计每次只会发出一个事件。是否可以使用全局窗口实现此目的?
相比之下,Flink Table API在相同方案中的行为与预期相同,但对于我的项目,我更感兴趣的是数据流API。
表接口示例,运行正常:
tableEnv
.sqlQuery(
"""
|SELECT *
| FROM stream1
| JOIN stream2
| ON stream1.id = stream2.id
""".stripMargin)
.toRetractStream[Row]
.filter(_._1) // just keep the inserts
.map(...)
.print() // works as expected, after re-sending updated records
谢谢您,
尼古拉斯
推荐答案
问题是永远不会从全局窗口中删除记录。因此,只要有新记录到达,但旧记录仍然存在,您就会在全局窗口上触发联接操作。
因此,要让它在您的案例中运行,您需要实现一个自定义的evictor。我在一个最小的工作示例中扩展了您的示例,并添加了清除器,我将在代码段之后对其进行说明。val data1 = List(
(1L, "myId-1"),
(2L, "myId-2"),
(5L, "myId-1"),
(9L, "myId-1"))
val data2 = List(
(3L, "myId-1", "myValue-A"))
val stream1 = env.fromCollection(data1)
val stream2 = env.fromCollection(data2)
stream1.join(stream2)
.where(_._2).equalTo(_._2)
.window(GlobalWindows.create()) // assume this is a requirement
.trigger(CountTrigger.of(1))
.evictor(new Evictor[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)], GlobalWindow](){
override def evictBefore(elements: lang.Iterable[TimestampedValue[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)]]], size: Int, window: GlobalWindow, evictorContext: Evictor.EvictorContext): Unit = {}
override def evictAfter(elements: lang.Iterable[TimestampedValue[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)]]], size: Int, window: GlobalWindow, evictorContext: Evictor.EvictorContext): Unit = {
import scala.collection.JavaConverters._
val lastInputTwoIndex = elements.asScala.zipWithIndex.filter(e => e._1.getValue.isTwo).lastOption.map(_._2).getOrElse(-1)
if (lastInputTwoIndex == -1) {
println("Waiting for the lookup value before evicting")
return
}
val iterator = elements.iterator()
for (index <- 0 until size) {
val cur = iterator.next()
if (index != lastInputTwoIndex) {
println(s"evicting ${cur.getValue.getOne}/${cur.getValue.getTwo}")
iterator.remove()
}
}
}
})
.apply((r, l) => (r, l))
.print()
将在应用窗口函数(在本例中为Join)之后应用逐出函数。在第二个输入中有多个条目的情况下,您的用例到底应该如何工作并不完全清楚,但目前,逐出程序只处理单个条目。
每当有新元素进入窗口时,都会立即触发窗口函数(count=1)。然后,使用具有相同键的所有元素评估联接。之后,为了避免重复输出,我们从当前窗口的第一个输入中删除了所有条目。由于第二输入可能在第一输入之后到达,因此当第二输入为空时不执行驱逐。请注意,我的Scala相当生疏;您将能够以更好的方式编写它。运行的输出为:
Waiting for the lookup value before evicting
Waiting for the lookup value before evicting
Waiting for the lookup value before evicting
Waiting for the lookup value before evicting
4> ((1,myId-1),(3,myId-1,myValue-A))
4> ((5,myId-1),(3,myId-1,myValue-A))
4> ((9,myId-1),(3,myId-1,myValue-A))
evicting (1,myId-1)/null
evicting (5,myId-1)/null
evicting (9,myId-1)/null
最后一点:如果表API已经提供了您想要的简明方式,我会坚持使用它,然后在需要时convert it to a DataStream。
这篇关于为什么Flink在数据流连接+全局窗口上发出重复记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文