当我重新运行 Flink 消费者时,Kafka 再次消费最新消息 [英] Kafka consuming the latest message again when I rerun the Flink consumer

查看:30
本文介绍了当我重新运行 Flink 消费者时,Kafka 再次消费最新消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我用 Scala 编写的 Apache Flink API 创建了一个 Kafka 消费者.每当我从某个主题传递一些消息时,它都会及时接收它们.但是,当我重新启动使用者时,它不会接收新的或未使用的消息,而是使用发送到该主题的最新消息.

I have created a Kafka consumer in Apache Flink API written in Scala. Whenever I pass some messages from a topic, it duly is receiving them. However, when I restart the consumer, instead of receiving the new or unconsumed messages, it consumes the latest message that was sent to that topic.

这是我在做什么:

  1. 运行生产者:

  1. Running the producer:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2

  • 运行消费者:

  • Running the consumer:

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val st = env
        .addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties))
    env.enableCheckpointing(5000)
    st.print()
    env.execute()
    

  • 传递一些消息

  • Passing some messages

    推荐答案

    您正在以 5 秒的检查点间隔运行 Kafka 使用者.所以每 5 秒,Flink 就会创建一份你的运营商状态(偏移量)的副本以进行恢复.

    You are running a Kafka consumer with a checkpoint interval of 5 seconds. So every 5 seconds, Flink is creating a copy of your operator's state (the offsets) for recovery.

    一旦检查点完成,它会让操作员知道检查点已完成.在该通知中,Kafka 消费者将偏移量提交给 Zookeeper.所以大约每 5 秒,我们将最后一个检查点的偏移量写入 ZK.

    Once the checkpoint is completed, it will let the operator know that the checkpoint is finished. On that notification, the Kafka consumer commits the offsets to Zookeeper. So roughly every 5 seconds, we are writing the offsets of the last checkpoint into ZK.

    当你再次启动 Flink 作业时,它会在 ZK 中找到偏移量并从那里继续.根据时间的不同,所有提交到 ZK 后收到的消息都会再次发送.

    When you start the Flink job again, it will find the offsets in ZK and go on from there. Depending on the timing, all messages received after the commit to ZK will be send again.

    您无法避免这种行为,因为 .print() 操作符"不是检查点的一部分.它意味着作为调试实用程序.但是,参与检查点的数据接收器(例如滚动文件接收器)将确保不会将重复项写入文件系统.

    You can not avoid this behavior because the .print() "operator" is not part of the checkpointing. Its meant as a debugging utility. However a data sink which participates in the checkpointing (for example the rolling file sink) will ensure that no duplicates are written to the file system.

    这篇关于当我重新运行 Flink 消费者时,Kafka 再次消费最新消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆