当在Kafka中使用邮件时,为什么不更新偏移量 [英] Why doesn't offset get updated when messages are consumed in Kafka

查看:148
本文介绍了当在Kafka中使用邮件时,为什么不更新偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在实现Kafka消费者类以接收消息.我只想每次都收到新消息.因此,我将enable.auto.commit设置为true.但是,偏移量似乎根本没有改变.即使主题,消费者组和分区始终相同.

I am implementing Kafka consumer class to receive messages. I wanted to only get the new messages every time. Therefore, I set enable.auto.commit true. However the offset does not seem to change at all. Even though the topic, consumer group and partition has been always the same.

这是我的消费者代码:

    consumerConfig.put("bootstrap.servers", bootstrap);
    consumerConfig.put("group.id", KafkaTestConstants.KAFKA_GROUP);
    consumerConfig.put("enable.auto.commit", "true");
    consumerConfig.put("auto.offset.reset", "earliest");
    consumerConfig.put("auto.commit.interval", 1000);
    consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
    consumerConfig.put("value.deserializer", StringDeserializer.class.getName());
    StringDeserializer deserializer = new StringDeserializer();
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);

    TopicPartition tp = new TopicPartition(KafkaTestConstants.KAFKA_TOPIC, 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    kafkaConsumer.assign(tps);
    long offset = kafkaConsumer.position(tp);
    System.out.println("Offset of partition: " + offset);

    List<String> messages = new ArrayList<String>();
    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);

    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Message received: " + record.value());
        messages.add(record.value());
    }

    consumer.commitAsync();
    System.out.println("Offset commited.\n");
    consumer.close();

无论我运行多少次,它始终显示偏移量为0.因此,它始终从一开始就接收所有消息.我想念什么?

No matter how many times I run it, it always shows the offset is 0. Therefore, it always receive all messages from the very beginning. What am I missing?

编辑:基于Matthias的回答,我决定手动提交偏移量.但是commitSync()会挂起. commitAsync()这类作品.稍后我将解释排序".代码是这样的:

Based on Matthias's answer, I decided to manually commit the offset. However commitSync() would hang. commitAsync() sort of works. I will explain the "sort of" later. Here is what the code does:

producer send 2 messages;
consumer initiates;
print out current position;
consumer.poll();
print received messages;
consumer.commitAsync();

这是此代码的行为方式.假设我有100条消息.现在,生产者发送了2条新消息.在进行消费者调查之前,它将显示当前的偏移位置为102,应该为100.因此,不会打印出新消息.几乎就像偏移量是在生产者发送消息后更新的一样.

This is how this code behaves. Say I have 100 messages. Now producers sends 2 new messages. Before consumer poll, it would show current offset position as 102 which is supposed to be 100. Therefore, no new messages will be printed out. It is almost like the offset is updated after producer sent the messages.

推荐答案

仅当您使用使用者组管理时,自动提交才有效,为此,您需要订阅"一个主题,而不是手动分配"分区.

Auto commit only works if you use consumer group management, and for this, you need to "subscribe" to a topic, but not "assign" partitions manually.

比较KafkaConsumer的JavaDocs.这是一本很长的书,但是需要了解有关如何正确使用消费者的细微细节:

Compare the JavaDocs of KafkaConsumer. It's a long read, but required to understand the subtle details on how the use the consumer correctly: https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

此外,如果启用了自动提交,它将在poll之内提交(即,对poll()的调用可能会将从上一次调用返回的消息提交给poll()),而不是当您遍历returne时消息.这也意味着您的提交将向前跳跃",例如从提交的偏移量0到100(如果您通过轮询收到了单个分区的100条消息).

Furthermore, if auto-commit is enabled, it will commit within poll (ie, a call to poll() might commit the messages return from the previous call to poll()) and not when you iterate through the returne messages. This also means, that your commits will "jump" forward, like from committed offset 0 to 100 (if you received 100 messages by poll for a single partition).

这篇关于当在Kafka中使用邮件时,为什么不更新偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆