再次重新处理/读取 Kafka 记录/消息 - 消费者组偏移重置的目的是什么? [英] Re-processing/reading Kafka records/messages again - What is the purpose of Consumer Group Offset Reset?

查看:40
本文介绍了再次重新处理/读取 Kafka 记录/消息 - 消费者组偏移重置的目的是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的 kafka 主题总共有 10 条记录/消息,2 个分区,每个分区有 5 条消息.我的消费者组有 2 个消费者,每个消费者已经分别从他们分配的分区中读取了 5 条消息.现在,我想从开始/开始(偏移量 0)重新处理/读取来自我的主题的消息.

My kafka topic has 10 records/messages in total and 2 partitions having 5 messages each. My consumer group has 2 consumers and each of the consumer has already read 5 messages from their assigned partition respectively. Now, I want to re-process/read messages from my topic from start/beginning (offset 0).

我停止了我的 kafka 消费者并运行以下命令将消费者组偏移重置为 0.

I stopped my kafka consumers and ran following command to reset consumer group offset to 0.

./kafka-consumer-groups.sh --group cg1 --reset-offsets --to-offset 0 --topic t1 --execute --bootstrap-server "..."

我的期望是,一旦我重新启动我的 kafka 消费者,他们将开始从偏移量 0 即开始读取记录,但这并没有发生,他们从最后一个位置(即偏移量 5)进行轮询.为什么会这样?然后我必须让我的每个消费者明确地寻求偏移 0(开始)以从头开始重新处理/读取记录.在后来的测试周期中,我什至没有运行上面的命令来重置 kafka 消费者组的偏移量.

My expectation was that once I restart my kafka consumers they will start reading records from offset 0 i.e. beginning, but that didn't happen and they polled from their last position i.e. offset 5. Why is that so? I then have to make each of my consumers, explicitly seek to offset 0 (beginning) to re-process/read records from the beginning. And in later tests cycles, I didn't even ran above command to reset offset for kafka consumer group.

我的问题是,如果我必须让我的消费者明确地寻求开始让他们再次重新处理/读取消息,那么重置 kafka 消费者组的偏移量的目的是什么?

My question is, if I have to make my consumers explicitly seek to beginning to make them re-process/read messages again, then what's the purpose of resetting the offset of kafka consumer group?

推荐答案

处理 Kafka 消费者偏移有点棘手.消费者程序仅在使用的消费者组没有在内部 Kafka 主题中提交有效偏移量时才使用 auto.offset.reset 配置.(其他支持的偏移存储是 Zookeeper,但内部 Kafka 主题用作偏移存储在最新的 Kafka 版本中).

Handling Kafka consumer offsets is bit more tricky. Consumer program uses auto.offset.reset config only when consumer group used does not have a valid offset committed in an internal Kafka topic.(Other supported offset storage is Zookeeper but internal Kafka topic is used as offset storage in latest Kafka versions).

考虑以下场景:

  1. 名为'group1'的消费者组中的消费者已经消费了来自主题'testtopic'的5条消息,并且偏移量细节提交到内部Kafka主题-下次消费者启动时,它将不会使用'auto.offset.reset' 配置.相反,它将从存储中获取存储的偏移量,并继续从检索到的偏移量中获取消息.

  1. Consumer in consumer group named 'group1' has consumed 5 messages from topic 'testtopic' and offset details are committed to internal Kafka topic- Next time when the consumer starts, it will not use 'auto.offset.reset' config. Instead it will fetch the stored offset from storage and will continue fetch messages from the retrieved offset.

名为group2"的消费者组中的消费者作为新消费者启动以从testtopic"获取消息.这是一个新组,内部 Kafka 主题中没有可用的偏移详细信息 - 现在使用auto.offset.reset"配置来决定从哪里开始;无论是从主题的开头还是从最新的(只会使用新消息).

Consumer in consumer group named 'group2' is started as a new consumer to fetch messages from 'testtopic'. This is new group and there is no offset details available in internal Kafka topic- 'auto.offset.reset' config is used now to decide where to start; either from beginning of the topic or from latest(only new messages will be consumed).

根据您的问题,问题是重置偏移量的命令不起作用,您必须手动寻找开始和启动消费者.

The issue as per your question is that the command to reset offset not working, you have to manually seek to beginning and start consumer.

kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> [--topic <topic_name> or --all-topics] --reset-offsets [--to-earliest or --to-offset <offset>] --execute

重置命令不起作用有三种可能.

There are three possibilities for reset command not working.

  1. 日志保留期更短,您尝试重置的偏移量不再可用
  2. 消费者组中的消费者实例正在运行.在这两种情况下,重置偏移命令可能不起作用.
  3. Kafka 版本是 <0.11.重置偏移 API 仅适用于 Kafka 0.11

从你的问题来看,第一种和第三种情况不太可能.请检查第二种情况.停止任何正在运行的消费者实例,然后尝试重置偏移量.

From your question, first and third case is unlikely. Please check for second case. Stop any consumer instance running and then try resetting offsets.

下面的命令可用于检查消费者组是否有活动的消费者实例.

Below command can be used to check whether a consumer group has active consumer instance.

kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> --describe

示例输出:

Consumer group 'group1' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
intro           0          0               99              99 

这篇关于再次重新处理/读取 Kafka 记录/消息 - 消费者组偏移重置的目的是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆