卡夫卡&重新启动时会重新复制邮件 [英] Kafka & Flink duplicate messages on restart

查看:271
本文介绍了卡夫卡&重新启动时会重新复制邮件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

首先,这与当我重新运行Flink消费者时,Kafka再次消费最新消息,但它不一样。这个问题的答案似乎不能解决我的问题。如果我错过了那个答案,那么请重新整理答案,因为我清楚地错过了一些东西。



尽管如此,问题是完全一样的 - Flink(kafka连接器)重新运行在关闭之前看到的最后3-9个消息。



我的版本



  Flink 1.1.2 
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91
pre>

我的代码



  import java.util.Properties 
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api .CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime .state.filesystem._

对象Runner {
def main(args:Array [String]):Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env .enableCheckpointi ng(500)
env.setStateBackend(new FsStateBackend(file:/// tmp / checkpoints))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

val properties = new Properties()
properties.setProperty(bootstrap.servers,localhost:9092);
properties.setProperty(group.id,testing);

val kafkaConsumer = new FlinkKafkaConsumer09 [String](test-in,新的SimpleStringSchema(),属性)
val kafkaProducer = new FlinkKafkaProducer09 [String](localhost:9092,测试出来,新的SimpleStringSchema())
env.addSource(kafkaConsumer)
.addSink(kafkaProducer)

env.execute()
}
}



我的SBT依赖关系



  libraryDependencies ++ = Seq(
org.apache.flink%%flink-scala%1.1.2,
org.apache.flink %%flink-streaming-scala%1.1.2,
org.apache.flink%%flink-clients%1.1.2,
org.apache。 flink%%flink-connector-kafka-0.9%1.1.2,
org.apache.flink%%flink-connector-filesystem%1.1.2



我的流程



(3个终端)

  TERM-1启动sbt,运行程序
TERM-2创建kafka主题测试和测试
TE RM-2在测试主题中运行kafka-console-producer-b $ b TERM-3在测试主题上运行kafka-console-consumer
TERM-2向kafka生产者发送数据。
等待几秒钟(缓冲区需要刷新)
TERM-3观看数据出现在测试主题中
等待至少500毫秒的检查点发生
TERM- 1停止sbt
TERM-1运行sbt
TERM-3看最后几行数据出现在测试主题



我的期望



当系统没有错误时,我希望能够打开和关闭flink,而不需要重新处理消息



我尝试修复



我已经添加了通话到 setStateBackend ,认为默认的内存后端可能不正确。这似乎没有帮助。



我删除了对 enableCheckpointing 的呼叫,希望也许有在Flink vs Zookeeper中跟踪状态的单独机制。这似乎没有帮助。



我使用了不同的接收器,RollingFileSink,print();希望这个bug可能是在卡夫卡。这似乎没有帮助。



我已经回滚到flink(和所有连接器)v1.1.0和v1.1.1,希望这可能是bug最新版本。这似乎没有帮助。



我已经将 zookeeper.connect config添加到属性对象,希望对它的评论仅在0.8中有用是错误的。这似乎没有帮助。



我已经将检查点模式设置为 EXACTLY_ONCE (好主意drfloob )。这似乎没有帮助。



我的恳求



帮助!

解决方案

(我在JIRA发布了相同的回复,只是在这里交叉发表)



从您的描述中,我假设您手动关闭该作业,然后重新提交,是否正确?



Flink除非您使用保存点( https:/ /ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html )。
完全一次的保证是指工作失败时,然后自动从以前的检查点恢复(启用检查点时,就像使用env.enableCheckpointing(500)所做的那样)



实际发生的是,当您手动重新提交作业时,Kafka消费者只需从ZK / Kafka中提供的现有偏移量开始阅读。第一次执行这项工作后,这些补偿金已经交给ZK / Kafka。然而,它们并不用于Flink的一次语义; Flink使用内部检查点的Kafka偏移量。卡夫卡消费者承诺向ZK提供这些补偿,只是将工作消费的进度衡量到外部世界(wrt Flink)。


First of all, this is very similar to Kafka consuming the latest message again when I rerun the Flink consumer, but it's not the same. The answer to that question does NOT appear to solve my problem. If I missed something in that answer, then please rephrase the answer, as I clearly missed something.

The problem is the exact same, though -- Flink (the kafka connector) re-runs the last 3-9 messages it saw before it was shut down.

My Versions

Flink 1.1.2
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91

My Code

import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._

object Runner {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(500)
    env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "testing");

    val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
    val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
    env.addSource(kafkaConsumer)
      .addSink(kafkaProducer)

    env.execute()
  }
}

My SBT Dependencies

libraryDependencies ++= Seq(
    "org.apache.flink" %% "flink-scala" % "1.1.2",
    "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
    "org.apache.flink" %% "flink-clients" % "1.1.2",
    "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
    "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
)

My Process

(3 terminals)

TERM-1 start sbt, run program
TERM-2 create kafka topics testing-in and testing-out
TERM-2 run kafka-console-producer on testing-in topic
TERM-3 run kafka-console-consumer on testing-out topic
TERM-2 send data to kafka producer.
Wait for a couple seconds (buffers need to flush)
TERM-3 watch data appear in testing-out topic
Wait for at least 500 milliseconds for checkpointing to happen
TERM-1 stop sbt
TERM-1 run sbt
TERM-3 watch last few lines of data appear in testing-out topic

My Expectations

When there are no errors in the system, I expect to be able to turn flink on and off without reprocessing messages that successfully completed the stream in a prior run.

My Attempts to Fix

I've added the call to setStateBackend, thinking that perhaps the default memory backend just didn't remember correctly. That didn't seem to help.

I've removed the call to enableCheckpointing, hoping that perhaps there was a separate mechanism to track state in Flink vs Zookeeper. That didn't seem to help.

I've used different sinks, RollingFileSink, print(); hoping that maybe the bug was in kafka. That didn't seem to help.

I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that maybe the bug was in the latest version. That didn't seem to help.

I've added the zookeeper.connect config to the properties object, hoping that the comment about it only being useful in 0.8 was wrong. That didn't seem to help.

I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea drfloob). That didn't seem to help.

My Plea

Help!

解决方案

(I've posted the same reply in the JIRA, just cross-posting the same here)

From your description, I'm assuming you're manually shutting down the job, and then resubmitting it, correct?

Flink does not retain exactly-once across manual job restarts, unless you use savepoints (https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html). The exactly-once guarantee refers to when the job fails and then automatically restores itself from previous checkpoints (when checkpointing is enabled, like what you did with env.enableCheckpointing(500) )

What is actually happening is that the Kafka consumer is simply start reading from existing offsets committed in ZK / Kafka when you manually resubmitted the job. These offsets were committed to ZK / Kafka the first time you executed the job. They however are not used for Flink's exactly-once semantics; Flink uses internally checkpointed Kafka offsets for that. The Kafka consumer commits those offsets back to ZK simply to expose a measure of progress of the job consumption to the outside world (wrt Flink).

这篇关于卡夫卡&重新启动时会重新复制邮件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆