Kafka 如何使用所需的偏移量提交消费者偏移量 [英] How change Kafka committed consumer offset with required offset

查看:86
本文介绍了Kafka 如何使用所需的偏移量提交消费者偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有 Kafka Stream 应用程序.我的应用程序正在成功处理事件.

I have Kafka Stream application. My application is processing the events successfully.

如何使用所需的偏移量更改 Kafka 提交的消费者偏移量以重新处理/跳过事件.我尝试了如何更改主题的起始偏移量?.但我收到节点不存在:"错误.请帮帮我.

How to change Kafka committed consumer offset with required offset to reprocess/skip the events. I tried How to change start offset for topic?. But I got 'Node does not exist:' error. Please help me.

推荐答案

您所指的问题/答案基于较旧的 Kafka 版本.从 Kafka 0.9 开始,偏移量不再提交给 ZooKeeper,而是存储在一个名为 offset topic 的特殊 Kafka 主题中(主题名称为 __consumer_offsets).

The question/answer you are referring to is based on an older Kafka version. Since Kafka 0.9, offsets are not committed to ZooKeeper but store in a special Kafka topic called the offset topic (topic name is __consumer_offsets).

自 Kafka 1.0 起,命令行工具 bin/kafka-consumer-groups.sh 具有允许设置偏移量的新功能.查看原始 KIP:https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling

Since Kafka 1.0, the command line tool bin/kafka-consumer-groups.sh has a new feature that allow to set offsets. Check out the original KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling

该工具也适用于 Kafka 0.11(甚至可能是更旧的 Kafka 版本).

The tool also works for Kafka 0.11 (and maybe even older Kafka versions).

另一种方法是编写自己的工具,该工具使用单个 KafkaConsumer 和相应的 group.id,订阅您要修改偏移量的主题对于,seek()commit() 偏移量.(注意,您应该为此消费者禁用自动提交.)

An alternative is to write your own tool, that uses an single KafkaConsumer with the corresponding group.id, subscribe to the topic(s) you want to modify offsets for, seek() and commit() offsets. (Note, you should disable auto commit for this consumer.)

这篇关于Kafka 如何使用所需的偏移量提交消费者偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆