为什么Flink在数据流连接+全局窗口上发出重复记录? [英] Why does Flink emit duplicate records on a DataStream join + Global window?

查看:10
本文介绍了为什么Flink在数据流连接+全局窗口上发出重复记录?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在学习/试验Flink,我观察到数据流联接的一些意外行为,希望了解发生的情况.

假设我有两个流,每个流有10条记录,我希望在id字段上加入这两个流。让我们假设一个流中的每个记录在另一个流中都有一个匹配的记录,并且ID在每个流中都是唯一的。我们还假设我必须使用全局窗口(要求)。

使用数据流API连接(我在Scala中的简化代码):

val stream1 = ... // from a Kafka topic on my local machine (I tried with and without .keyBy)
val stream2 = ... 

stream1
  .join(stream2)
  .where(_.id).equalTo(_.id)
  .window(GlobalWindows.create()) // assume this is a requirement
  .trigger(CountTrigger.of(1))
  .apply {
    (row1, row2) => // ... 
  }
  .print()

结果:

  • 所有内容都按预期打印,第一个流中的每条记录与第二个流中的一条记录联接。

但是:

  • 如果我从其中一个流向该流重新发送其中一个记录(例如,具有更新的字段),则会发出两个重复的联接事件😞
  • 如果我重复该操作(使用或不使用更新的字段),我将得到3个发出的事件,然后是4个、5个,依此类推……😞

Flink社区的人能解释一下为什么会发生这种情况吗?我原本预计每次只会发出一个事件。是否可以使用全局窗口实现此目的?

相比之下,Flink Table API在相同方案中的行为与预期相同,但对于我的项目,我更感兴趣的是数据流API。

表接口示例,运行正常:

tableEnv
  .sqlQuery(
    """
      |SELECT *
      |  FROM stream1
      |       JOIN stream2
      |       ON stream1.id = stream2.id
    """.stripMargin)
  .toRetractStream[Row]
  .filter(_._1) // just keep the inserts
  .map(...)
  .print() // works as expected, after re-sending updated records

谢谢您,

尼古拉斯

推荐答案

问题是永远不会从全局窗口中删除记录。因此,只要有新记录到达,但旧记录仍然存在,您就会在全局窗口上触发联接操作。

因此,要让它在您的案例中运行,您需要实现一个自定义的evictor。我在一个最小的工作示例中扩展了您的示例,并添加了清除器,我将在代码段之后对其进行说明。

val data1 = List(
  (1L, "myId-1"),
  (2L, "myId-2"),
  (5L, "myId-1"),
  (9L, "myId-1"))

val data2 = List(
  (3L, "myId-1", "myValue-A"))

val stream1 = env.fromCollection(data1)
val stream2 = env.fromCollection(data2)

stream1.join(stream2)
  .where(_._2).equalTo(_._2)
  .window(GlobalWindows.create()) // assume this is a requirement
  .trigger(CountTrigger.of(1))
  .evictor(new Evictor[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)], GlobalWindow](){
    override def evictBefore(elements: lang.Iterable[TimestampedValue[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)]]], size: Int, window: GlobalWindow, evictorContext: Evictor.EvictorContext): Unit = {}

    override def evictAfter(elements: lang.Iterable[TimestampedValue[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)]]], size: Int, window: GlobalWindow, evictorContext: Evictor.EvictorContext): Unit = {
      import scala.collection.JavaConverters._
      val lastInputTwoIndex = elements.asScala.zipWithIndex.filter(e => e._1.getValue.isTwo).lastOption.map(_._2).getOrElse(-1)
      if (lastInputTwoIndex == -1) {
        println("Waiting for the lookup value before evicting")
        return
      }
      val iterator = elements.iterator()
      for (index <- 0 until size) {
        val cur = iterator.next()
        if (index != lastInputTwoIndex) {
          println(s"evicting ${cur.getValue.getOne}/${cur.getValue.getTwo}")
          iterator.remove()
        }
      }
    }
  })
  .apply((r, l) => (r, l))
  .print()
将在应用窗口函数(在本例中为Join)之后应用逐出函数。在第二个输入中有多个条目的情况下,您的用例到底应该如何工作并不完全清楚,但目前,逐出程序只处理单个条目。

每当有新元素进入窗口时,都会立即触发窗口函数(count=1)。然后,使用具有相同键的所有元素评估联接。之后,为了避免重复输出,我们从当前窗口的第一个输入中删除了所有条目。由于第二输入可能在第一输入之后到达,因此当第二输入为空时不执行驱逐。请注意,我的Scala相当生疏;您将能够以更好的方式编写它。运行的输出为:

Waiting for the lookup value before evicting
Waiting for the lookup value before evicting
Waiting for the lookup value before evicting
Waiting for the lookup value before evicting
4> ((1,myId-1),(3,myId-1,myValue-A))
4> ((5,myId-1),(3,myId-1,myValue-A))
4> ((9,myId-1),(3,myId-1,myValue-A))
evicting (1,myId-1)/null
evicting (5,myId-1)/null
evicting (9,myId-1)/null

最后一点:如果表API已经提供了您想要的简明方式,我会坚持使用它,然后在需要时convert it to a DataStream

这篇关于为什么Flink在数据流连接+全局窗口上发出重复记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆