Kafka 如何使用所需的偏移量提交消费者偏移量 [英] How change Kafka committed consumer offset with required offset
问题描述
我有 Kafka Stream 应用程序.我的应用程序正在成功处理事件.
I have Kafka Stream application. My application is processing the events successfully.
如何使用所需的偏移量更改 Kafka 提交的消费者偏移量以重新处理/跳过事件.我尝试了如何更改主题的起始偏移量?.但我收到节点不存在:"错误.请帮帮我.
How to change Kafka committed consumer offset with required offset to reprocess/skip the events. I tried How to change start offset for topic?. But I got 'Node does not exist:' error. Please help me.
推荐答案
您所指的问题/答案基于较旧的 Kafka 版本.从 Kafka 0.9 开始,偏移量不再提交给 ZooKeeper,而是存储在一个名为 offset topic 的特殊 Kafka 主题中(主题名称为 __consumer_offsets
).
The question/answer you are referring to is based on an older Kafka version. Since Kafka 0.9, offsets are not committed to ZooKeeper but store in a special Kafka topic called the offset topic (topic name is __consumer_offsets
).
自 Kafka 1.0 起,命令行工具 bin/kafka-consumer-groups.sh
具有允许设置偏移量的新功能.查看原始 KIP:https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling
Since Kafka 1.0, the command line tool bin/kafka-consumer-groups.sh
has a new feature that allow to set offsets. Check out the original KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling
该工具也适用于 Kafka 0.11(甚至可能是更旧的 Kafka 版本).
The tool also works for Kafka 0.11 (and maybe even older Kafka versions).
另一种方法是编写自己的工具,该工具使用单个 KafkaConsumer
和相应的 group.id
,订阅您要修改偏移量的主题对于,seek()
和 commit()
偏移量.(注意,您应该为此消费者禁用自动提交.)
An alternative is to write your own tool, that uses an single KafkaConsumer
with the corresponding group.id
, subscribe to the topic(s) you want to modify offsets for, seek()
and commit()
offsets. (Note, you should disable auto commit for this consumer.)
这篇关于Kafka 如何使用所需的偏移量提交消费者偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!