卡夫卡&重新启动时会重新复制邮件 [英] Kafka & Flink duplicate messages on restart
问题描述
首先,这与当我重新运行Flink消费者时,Kafka再次消费最新消息,但它不一样。这个问题的答案似乎不能解决我的问题。如果我错过了那个答案,那么请重新整理答案,因为我清楚地错过了一些东西。
尽管如此,问题是完全一样的 - Flink(kafka连接器)重新运行在关闭之前看到的最后3-9个消息。
我的版本
Flink 1.1.2
pre>
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91
我的代码
import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api .CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime .state.filesystem._
对象Runner {
def main(args:Array [String]):Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env .enableCheckpointi ng(500)
env.setStateBackend(new FsStateBackend(file:/// tmp / checkpoints))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val properties = new Properties()
properties.setProperty(bootstrap.servers,localhost:9092);
properties.setProperty(group.id,testing);
val kafkaConsumer = new FlinkKafkaConsumer09 [String](test-in,新的SimpleStringSchema(),属性)
val kafkaProducer = new FlinkKafkaProducer09 [String](localhost:9092,测试出来,新的SimpleStringSchema())
env.addSource(kafkaConsumer)
.addSink(kafkaProducer)
env.execute()
}
}
我的SBT依赖关系
libraryDependencies ++ = Seq(
org.apache.flink%%flink-scala%1.1.2,
org.apache.flink %%flink-streaming-scala%1.1.2,
org.apache.flink%%flink-clients%1.1.2,
org.apache。 flink%%flink-connector-kafka-0.9%1.1.2,
org.apache.flink%%flink-connector-filesystem%1.1.2
)
我的流程
(3个终端)
TERM-1启动sbt,运行程序
TERM-2创建kafka主题测试和测试
TE RM-2在测试主题中运行kafka-console-producer-b $ b TERM-3在测试主题上运行kafka-console-consumer
TERM-2向kafka生产者发送数据。
等待几秒钟(缓冲区需要刷新)
TERM-3观看数据出现在测试主题中
等待至少500毫秒的检查点发生
TERM- 1停止sbt
TERM-1运行sbt
TERM-3看最后几行数据出现在测试主题
我的期望
当系统没有错误时,我希望能够打开和关闭flink,而不需要重新处理消息
我尝试修复
我已经添加了通话到
setStateBackend
,认为默认的内存后端可能不正确。这似乎没有帮助。
我删除了对
enableCheckpointing
的呼叫,希望也许有在Flink vs Zookeeper中跟踪状态的单独机制。这似乎没有帮助。
我使用了不同的接收器,RollingFileSink,print();希望这个bug可能是在卡夫卡。这似乎没有帮助。
我已经回滚到flink(和所有连接器)v1.1.0和v1.1.1,希望这可能是bug最新版本。这似乎没有帮助。
我已经将
zookeeper.connect
config添加到属性对象,希望对它的评论仅在0.8中有用是错误的。这似乎没有帮助。
我已经将检查点模式设置为
EXACTLY_ONCE
(好主意drfloob )。这似乎没有帮助。
我的恳求
帮助!
解决方案(我在JIRA发布了相同的回复,只是在这里交叉发表)
从您的描述中,我假设您手动关闭该作业,然后重新提交,是否正确?
Flink除非您使用保存点( https:/ /ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html )。
完全一次的保证是指工作失败时,然后自动从以前的检查点恢复(启用检查点时,就像使用env.enableCheckpointing(500)所做的那样)
实际发生的是,当您手动重新提交作业时,Kafka消费者只需从ZK / Kafka中提供的现有偏移量开始阅读。第一次执行这项工作后,这些补偿金已经交给ZK / Kafka。然而,它们并不用于Flink的一次语义; Flink使用内部检查点的Kafka偏移量。卡夫卡消费者承诺向ZK提供这些补偿,只是将工作消费的进度衡量到外部世界(wrt Flink)。
First of all, this is very similar to Kafka consuming the latest message again when I rerun the Flink consumer, but it's not the same. The answer to that question does NOT appear to solve my problem. If I missed something in that answer, then please rephrase the answer, as I clearly missed something.
The problem is the exact same, though -- Flink (the kafka connector) re-runs the last 3-9 messages it saw before it was shut down.
My Versions
Flink 1.1.2 Kafka 0.9.0.1 Scala 2.11.7 Java 1.8.0_91
My Code
import java.util.Properties import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.connectors.kafka._ import org.apache.flink.streaming.util.serialization._ import org.apache.flink.runtime.state.filesystem._ object Runner { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(500) env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints")) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "testing"); val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties) val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema()) env.addSource(kafkaConsumer) .addSink(kafkaProducer) env.execute() } }
My SBT Dependencies
libraryDependencies ++= Seq( "org.apache.flink" %% "flink-scala" % "1.1.2", "org.apache.flink" %% "flink-streaming-scala" % "1.1.2", "org.apache.flink" %% "flink-clients" % "1.1.2", "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2", "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2" )
My Process
(3 terminals)
TERM-1 start sbt, run program TERM-2 create kafka topics testing-in and testing-out TERM-2 run kafka-console-producer on testing-in topic TERM-3 run kafka-console-consumer on testing-out topic TERM-2 send data to kafka producer. Wait for a couple seconds (buffers need to flush) TERM-3 watch data appear in testing-out topic Wait for at least 500 milliseconds for checkpointing to happen TERM-1 stop sbt TERM-1 run sbt TERM-3 watch last few lines of data appear in testing-out topic
My Expectations
When there are no errors in the system, I expect to be able to turn flink on and off without reprocessing messages that successfully completed the stream in a prior run.
My Attempts to Fix
I've added the call to
setStateBackend
, thinking that perhaps the default memory backend just didn't remember correctly. That didn't seem to help.I've removed the call to
enableCheckpointing
, hoping that perhaps there was a separate mechanism to track state in Flink vs Zookeeper. That didn't seem to help.I've used different sinks, RollingFileSink, print(); hoping that maybe the bug was in kafka. That didn't seem to help.
I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that maybe the bug was in the latest version. That didn't seem to help.
I've added the
zookeeper.connect
config to the properties object, hoping that the comment about it only being useful in 0.8 was wrong. That didn't seem to help.I've explicitly set the checkpointing mode to
EXACTLY_ONCE
(good idea drfloob). That didn't seem to help.My Plea
Help!
解决方案(I've posted the same reply in the JIRA, just cross-posting the same here)
From your description, I'm assuming you're manually shutting down the job, and then resubmitting it, correct?
Flink does not retain exactly-once across manual job restarts, unless you use savepoints (https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html). The exactly-once guarantee refers to when the job fails and then automatically restores itself from previous checkpoints (when checkpointing is enabled, like what you did with env.enableCheckpointing(500) )
What is actually happening is that the Kafka consumer is simply start reading from existing offsets committed in ZK / Kafka when you manually resubmitted the job. These offsets were committed to ZK / Kafka the first time you executed the job. They however are not used for Flink's exactly-once semantics; Flink uses internally checkpointed Kafka offsets for that. The Kafka consumer commits those offsets back to ZK simply to expose a measure of progress of the job consumption to the outside world (wrt Flink).
这篇关于卡夫卡&重新启动时会重新复制邮件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!