如何用所需的偏移量更改卡夫卡承诺的消费者偏移量 [英] How change Kafka committed consumer offset with required offset

查看:116
本文介绍了如何用所需的偏移量更改卡夫卡承诺的消费者偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有Kafka Stream应用程序.我的应用程序正在成功处理事件.

I have Kafka Stream application. My application is processing the events successfully.

如何使用所需的偏移量更改Kafka提交的使用者偏移量,以重新处理/跳过事件.我尝试了如何更改主题的起始偏移量.但我收到节点不存在:"错误.请帮助我.

How to change Kafka committed consumer offset with required offset to reprocess/skip the events. I tried How to change start offset for topic?. But I got 'Node does not exist:' error. Please help me.

推荐答案

您所指的问题/答案基于较旧的Kafka版本.从Kafka 0.9开始,偏移量不会提交给ZooKeeper,而是存储在称为 offset topic (主题名称为__consumer_offsets)的特殊Kafka主题中.

The question/answer you are referring to is based on an older Kafka version. Since Kafka 0.9, offsets are not committed to ZooKeeper but store in a special Kafka topic called the offset topic (topic name is __consumer_offsets).

自Kafka 1.0起,命令行工具bin/kafka-consumer-groups.sh具有允许设置偏移量的新功能.查看原始KIP:

Since Kafka 1.0, the command line tool bin/kafka-consumer-groups.sh has a new feature that allow to set offsets. Check out the original KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling

该工具还适用于Kafka 0.11(甚至可能是较旧的Kafka版本).

The tool also works for Kafka 0.11 (and maybe even older Kafka versions).

一种替代方法是编写自己的工具,该工具使用单个KafkaConsumer和相应的group.id,订阅要修改其偏移量的主题,seek()commit()偏移量. (请注意,您应该为此用户禁用自动提交.)

An alternative is to write your own tool, that uses an single KafkaConsumer with the corresponding group.id, subscribe to the topic(s) you want to modify offsets for, seek() and commit() offsets. (Note, you should disable auto commit for this consumer.)

这篇关于如何用所需的偏移量更改卡夫卡承诺的消费者偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆