为什么在Kafka中使用消息时偏移量没有更新 [英] Why doesn't offset get updated when messages are consumed in Kafka

查看:141
本文介绍了为什么在Kafka中使用消息时偏移量没有更新的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在实现 Kafka 消费者类来接收消息.我只想每次都收到新消息.因此,我将 enable.auto.commit 设置为 true.然而,偏移似乎根本没有改变.尽管主题、消费者组和分区一直是相同的.

I am implementing Kafka consumer class to receive messages. I wanted to only get the new messages every time. Therefore, I set enable.auto.commit true. However the offset does not seem to change at all. Even though the topic, consumer group and partition has been always the same.

这是我的消费者代码:

    consumerConfig.put("bootstrap.servers", bootstrap);
    consumerConfig.put("group.id", KafkaTestConstants.KAFKA_GROUP);
    consumerConfig.put("enable.auto.commit", "true");
    consumerConfig.put("auto.offset.reset", "earliest");
    consumerConfig.put("auto.commit.interval", 1000);
    consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
    consumerConfig.put("value.deserializer", StringDeserializer.class.getName());
    StringDeserializer deserializer = new StringDeserializer();
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);

    TopicPartition tp = new TopicPartition(KafkaTestConstants.KAFKA_TOPIC, 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    kafkaConsumer.assign(tps);
    long offset = kafkaConsumer.position(tp);
    System.out.println("Offset of partition: " + offset);

    List<String> messages = new ArrayList<String>();
    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);

    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Message received: " + record.value());
        messages.add(record.value());
    }

    consumer.commitAsync();
    System.out.println("Offset commited.\n");
    consumer.close();

无论我运行多少次,它总是显示偏移量为0.因此,它总是从一开始就收到所有消息.我错过了什么?

No matter how many times I run it, it always shows the offset is 0. Therefore, it always receive all messages from the very beginning. What am I missing?

根据 Matthias 的回答,我决定手动提交偏移量.但是 commitSync() 会挂起.commitAsync() 有点像.稍后我将解释某种".下面是代码的作用:

Based on Matthias's answer, I decided to manually commit the offset. However commitSync() would hang. commitAsync() sort of works. I will explain the "sort of" later. Here is what the code does:

producer send 2 messages;
consumer initiates;
print out current position;
consumer.poll();
print received messages;
consumer.commitAsync();

这就是这段代码的行为方式.假设我有 100 条消息.现在生产者发送 2 个新消息.在消费者轮询之前,它会将当前偏移位置显示为 102,应该是 100.因此,不会打印出新消息.这几乎就像在生产者发送消息后更新偏移量一样.

This is how this code behaves. Say I have 100 messages. Now producers sends 2 new messages. Before consumer poll, it would show current offset position as 102 which is supposed to be 100. Therefore, no new messages will be printed out. It is almost like the offset is updated after producer sent the messages.

推荐答案

自动提交仅在您使用消费者组管理时有效,为此,您需要订阅"主题,而不是手动分配"分区.

Auto commit only works if you use consumer group management, and for this, you need to "subscribe" to a topic, but not "assign" partitions manually.

比较KafkaConsumer的JavaDocs.这是一个很长的阅读,但需要了解如何正确使用消费者的微妙细节:https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Compare the JavaDocs of KafkaConsumer. It's a long read, but required to understand the subtle details on how the use the consumer correctly: https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

此外,如果启用了自动提交,它将在 poll 内提交(即,对 poll() 的调用可能会将上次调用返回的消息提交到poll()) 而不是在您遍历返回消息时.这也意味着,您的提交将跳跃"向前,例如从提交的偏移量 0 到 100(如果您通过轮询单个分区收到 100 条消息).

Furthermore, if auto-commit is enabled, it will commit within poll (ie, a call to poll() might commit the messages return from the previous call to poll()) and not when you iterate through the returne messages. This also means, that your commits will "jump" forward, like from committed offset 0 to 100 (if you received 100 messages by poll for a single partition).

这篇关于为什么在Kafka中使用消息时偏移量没有更新的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆