重新处理/读取Kafka记录/消息-消费者组补偿重置的目的是什么? [英] Re-processing/reading Kafka records/messages again - What is the purpose of Consumer Group Offset Reset?

查看:947
本文介绍了重新处理/读取Kafka记录/消息-消费者组补偿重置的目的是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的kafka主题总共有10条记录/消息,还有2个分区,每个分区有5条消息.我的使用者组有2个使用者,每个使用者已经分别从其分配的分区中读取了5条消息.现在,我想从开始/开始(偏移量0)重新处理/读取主题中的消息.

My kafka topic has 10 records/messages in total and 2 partitions having 5 messages each. My consumer group has 2 consumers and each of the consumer has already read 5 messages from their assigned partition respectively. Now, I want to re-process/read messages from my topic from start/beginning (offset 0).

我停止了我的kafka使用者,并执行以下命令将使用者组偏移量重置为0.

I stopped my kafka consumers and ran following command to reset consumer group offset to 0.

./kafka-consumer-groups.sh --group cg1 --reset-offsets --to-offset 0 --topic t1 --execute --bootstrap-server "..."

我的期望是,一旦我重新启动kafka使用者,他们将开始从偏移量0(即开始)读取记录,但是没有发生,而是从最后一个位置(即偏移量5)开始轮询.为什么会这样?然后,我必须让我的每个使用者都从头开始明确寻求偏移0(开始)以重新处理/读取记录.在以后的测试周期中,我什至没有跑过为Kafka用户组重置偏移量的命令.

My expectation was that once I restart my kafka consumers they will start reading records from offset 0 i.e. beginning, but that didn't happen and they polled from their last position i.e. offset 5. Why is that so? I then have to make each of my consumers, explicitly seek to offset 0 (beginning) to re-process/read records from the beginning. And in later tests cycles, I didn't even ran above command to reset offset for kafka consumer group.

我的问题是,如果我必须让我的消费者明确寻求开始使​​其重新处理/阅读消息,那么重置kafka消费者组的偏移量的目的是什么?

My question is, if I have to make my consumers explicitly seek to beginning to make them re-process/read messages again, then what's the purpose of resetting the offset of kafka consumer group?

推荐答案

处理Kafka使用者偏移量会比较棘手.仅当使用的消费者组在内部Kafka主题中未提交有效的偏移量时,消费者程序才使用 auto.offset.reset 配置.(其他受支持的偏移量存储是Zookeeper,但内部Kafka主题用作偏移量存储在最新的Kafka版本中.

Handling Kafka consumer offsets is bit more tricky. Consumer program uses auto.offset.reset config only when consumer group used does not have a valid offset committed in an internal Kafka topic.(Other supported offset storage is Zookeeper but internal Kafka topic is used as offset storage in latest Kafka versions).

请考虑以下情形:

  1. 名为"group1"的消费者组中的消费者已经消费了来自主题"testtopic"的5条消息,并且偏移量详细信息已提交给内部Kafka主题-消费者下次启动时,将不使用"自动". offset.reset "配置.相反,它将从存储中获取已存储的偏移量,并将继续从检索到的偏移量中获取消息.

  1. Consumer in consumer group named 'group1' has consumed 5 messages from topic 'testtopic' and offset details are committed to internal Kafka topic- Next time when the consumer starts, it will not use 'auto.offset.reset' config. Instead it will fetch the stored offset from storage and will continue fetch messages from the retrieved offset.

消费者作为新的消费者启动,以从"testtopic"中获取消息.这是新组,内部Kafka主题中没有可用的偏移量详细信息-' auto.offset.reset '配置现在用于确定从何处开始;从主题开始或从最新开始(仅使用新消息).

Consumer in consumer group named 'group2' is started as a new consumer to fetch messages from 'testtopic'. This is new group and there is no offset details available in internal Kafka topic- 'auto.offset.reset' config is used now to decide where to start; either from beginning of the topic or from latest(only new messages will be consumed).

根据您的问题,问题是重置偏移量的命令不起作用,您必须手动寻求开始和启动使用者.

The issue as per your question is that the command to reset offset not working, you have to manually seek to beginning and start consumer.

kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> [--topic <topic_name> or --all-topics] --reset-offsets [--to-earliest or --to-offset <offset>] --execute

存在三种重置命令不起作用的可能性.

There are three possibilities for reset command not working.

  1. 日志保留期较小,并且您要重置的偏移量不再可用
  2. 使用者组中的使用者实例正在运行.在这两种情况下,reset offset命令都可能不起作用.
  3. Kafka版本是< 0.11.重置偏移量API仅可用于Kafka 0.11

从您的问题来看,第一种和第三种情况不太可能.请检查第二种情况.停止运行任何使用者实例,然后尝试重置偏移量.

From your question, first and third case is unlikely. Please check for second case. Stop any consumer instance running and then try resetting offsets.

以下命令可用于检查使用者组是否具有活动的使用者实例.

Below command can be used to check whether a consumer group has active consumer instance.

kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> --describe

示例输出:

Consumer group 'group1' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
intro           0          0               99              99 

这篇关于重新处理/读取Kafka记录/消息-消费者组补偿重置的目的是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆