卡夫卡&重启时 Flink 重复消息 [英] Kafka & Flink duplicate messages on restart

查看:33
本文介绍了卡夫卡&重启时 Flink 重复消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

首先,这和 当我重新运行 Flink 消费者时,Kafka 再次消费了最新的消息,但它不一样.该问题的答案似乎不能解决我的问题.如果我在该答案中遗漏了某些内容,请重新表述答案,因为我显然遗漏了一些内容.

First of all, this is very similar to Kafka consuming the latest message again when I rerun the Flink consumer, but it's not the same. The answer to that question does NOT appear to solve my problem. If I missed something in that answer, then please rephrase the answer, as I clearly missed something.

问题是完全一样的——Flink(kafka 连接器)重新运行了它在关闭之前看到的最后 3-9 条消息.

The problem is the exact same, though -- Flink (the kafka connector) re-runs the last 3-9 messages it saw before it was shut down.

Flink 1.1.2
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91

我的代码

import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._

object Runner {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(500)
    env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "testing");

    val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
    val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
    env.addSource(kafkaConsumer)
      .addSink(kafkaProducer)

    env.execute()
  }
}

我的 SBT 依赖

libraryDependencies ++= Seq(
    "org.apache.flink" %% "flink-scala" % "1.1.2",
    "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
    "org.apache.flink" %% "flink-clients" % "1.1.2",
    "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
    "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
)

我的流程

(3 个终端)

TERM-1 start sbt, run program
TERM-2 create kafka topics testing-in and testing-out
TERM-2 run kafka-console-producer on testing-in topic
TERM-3 run kafka-console-consumer on testing-out topic
TERM-2 send data to kafka producer.
Wait for a couple seconds (buffers need to flush)
TERM-3 watch data appear in testing-out topic
Wait for at least 500 milliseconds for checkpointing to happen
TERM-1 stop sbt
TERM-1 run sbt
TERM-3 watch last few lines of data appear in testing-out topic

我的期望

当系统中没有错误时,我希望能够打开和关闭 flink,而无需重新处理在先前运行中成功完成流的消息.

My Expectations

When there are no errors in the system, I expect to be able to turn flink on and off without reprocessing messages that successfully completed the stream in a prior run.

我已将调用添加到 setStateBackend,认为可能是默认内存后端没有正确记住.这似乎没有帮助.

I've added the call to setStateBackend, thinking that perhaps the default memory backend just didn't remember correctly. That didn't seem to help.

我删除了对 enableCheckpointing 的调用,希望在 Flink 和 Zookeeper 中可能有一个单独的机制来跟踪状态.这似乎没有帮助.

I've removed the call to enableCheckpointing, hoping that perhaps there was a separate mechanism to track state in Flink vs Zookeeper. That didn't seem to help.

我使用过不同的接收器,RollingFileSink,print();希望这个错误可能在 kafka 中.这似乎没有帮助.

I've used different sinks, RollingFileSink, print(); hoping that maybe the bug was in kafka. That didn't seem to help.

我已经回滚到 flink(和所有连接器)v1.1.0 和 v1.1.1,希望这个错误可能在最新版本中.这似乎没有帮助.

I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that maybe the bug was in the latest version. That didn't seem to help.

我已将 zookeeper.connect 配置添加到属性对象中,希望有关它仅在 0.8 中有用的注释是错误的.这似乎没有帮助.

I've added the zookeeper.connect config to the properties object, hoping that the comment about it only being useful in 0.8 was wrong. That didn't seem to help.

我已将检查点模式明确设置为 EXACTLY_ONCE(好主意 drfloob).这似乎没有帮助.

I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea drfloob). That didn't seem to help.

帮助!

推荐答案

(我已经在 J​​IRA 中发布了相同的回复,只是在这里交叉发布)

根据您的描述,我假设您正在手动关闭作业,然后重新提交,对吗?

From your description, I'm assuming you're manually shutting down the job, and then resubmitting it, correct?

Flink 不会在手动作业重新启动时保留一次,除非您使用保存点 (https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html).仅一次保证是指作业失败时自动从先前的检查点恢复(启用检查点时,就像您对 env.enableCheckpointing(500) 所做的那样)

Flink does not retain exactly-once across manual job restarts, unless you use savepoints (https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html). The exactly-once guarantee refers to when the job fails and then automatically restores itself from previous checkpoints (when checkpointing is enabled, like what you did with env.enableCheckpointing(500) )

实际发生的是,当您手动重新提交作业时,Kafka 消费者只是开始从 ZK/Kafka 中提交的现有偏移量中读取.这些偏移量在您第一次执行作业时提交给 ZK/Kafka.然而,它们并不用于 Flink 的一次性语义;Flink 为此使用内部检查点的 Kafka 偏移量.Kafka 消费者将这些偏移量提交回 ZK,只是为了向外界公开工作消费进度的度量(wrt Flink).

What is actually happening is that the Kafka consumer is simply start reading from existing offsets committed in ZK / Kafka when you manually resubmitted the job. These offsets were committed to ZK / Kafka the first time you executed the job. They however are not used for Flink's exactly-once semantics; Flink uses internally checkpointed Kafka offsets for that. The Kafka consumer commits those offsets back to ZK simply to expose a measure of progress of the job consumption to the outside world (wrt Flink).

这篇关于卡夫卡&重启时 Flink 重复消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆