当我重新运行Flink使用者时,Kafka再次消耗了最新消息 [英] Kafka consuming the latest message again when I rerun the Flink consumer

查看:176
本文介绍了当我重新运行Flink使用者时,Kafka再次消耗了最新消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经用Scala编写的Apache Flink API创建了一个Kafka使用者.每当我从某个主题传递一些消息时,它就会适当地接收它们.但是,当我重新启动使用者时,它不会接收新的或未使用的消息,而是使用发送到该主题的最新消息.

I have created a Kafka consumer in Apache Flink API written in Scala. Whenever I pass some messages from a topic, it duly is receiving them. However, when I restart the consumer, instead of receiving the new or unconsumed messages, it consumes the latest message that was sent to that topic.

这就是我在做什么:

  1. 运行生产者:

  1. Running the producer:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2

  • 运行使用者:

  • Running the consumer:

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val st = env
        .addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties))
    env.enableCheckpointing(5000)
    st.print()
    env.execute()
    

  • 传递一些消息

  • Passing some messages

    推荐答案

    您正在运行的Kafka使用者的检查点间隔为5秒. 因此,每5秒钟,Flink就会创建一个操作员状态(偏移量)的副本以进行恢复.

    You are running a Kafka consumer with a checkpoint interval of 5 seconds. So every 5 seconds, Flink is creating a copy of your operator's state (the offsets) for recovery.

    一旦检查点完成,它将使操作员知道检查点已完成.根据该通知,Kafka使用者将偏移量提交给Zookeeper.因此,大约每5秒,我们会将最后一个检查点的偏移量写入ZK.

    Once the checkpoint is completed, it will let the operator know that the checkpoint is finished. On that notification, the Kafka consumer commits the offsets to Zookeeper. So roughly every 5 seconds, we are writing the offsets of the last checkpoint into ZK.

    再次启动Flink作业时,它将在ZK中找到偏移量,然后从那里继续.视时间而定,将再次发送提交到ZK之后收到的所有消息.

    When you start the Flink job again, it will find the offsets in ZK and go on from there. Depending on the timing, all messages received after the commit to ZK will be send again.

    您无法避免这种情况,因为.print()运算符"不是检查点的一部分.它的意思是作为调试实用程序. 但是,参与检查点的数据接收器(例如滚动文件接收器)将确保不会将任何重复写入文件系统.

    You can not avoid this behavior because the .print() "operator" is not part of the checkpointing. Its meant as a debugging utility. However a data sink which participates in the checkpointing (for example the rolling file sink) will ensure that no duplicates are written to the file system.

    这篇关于当我重新运行Flink使用者时,Kafka再次消耗了最新消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆