Kafka高级消费者使用Java API从主题获取所有消息(等效于--from-beginning) [英] Kafka High Level Consumer Fetch All Messages From Topic Using Java API (Equivalent to --from-beginning)
问题描述
我正在使用Kafka网站上的ConsumerGroupExample代码来测试Kafka高级消费者.我想检索有关我在Kafka服务器配置中称为测试"的所有现有消息.在其他博客中,应将auto.offset.reset设置为最小",以便能够获取所有消息:
I am testing the Kafka High Level Consumer using the ConsumerGroupExample code from the Kafka site. I would like to retrieve all the existing messages on the topic called "test" that I have in the Kafka server config. Looking at other blogs, auto.offset.reset should be set to "smallest" to be able to get all messages:
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "10000");
return new ConsumerConfig(props);
}
我真正的问题是:高级消费者的等效Java api调用是什么?
The question I really have is this: what is the equivalent Java api call for the High Level Consumer that is the equivalent of:
bin/kafka-console-consumer.sh --zookeeper本地主机:2181 --topic测试--from-beginning
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
推荐答案
基本上,每次新使用者尝试使用某个主题时,它都会从头开始读取消息.如果您特别出于每次测试目的而从头开始消费,那么每次使用新的groupID初始化消费者时,它都会从头开始读取消息.这是我的操作方式:
Basically, everytime a new consumer tries to consume a topic, it'll read messages from the beginning. If you're especially just consuming from the beginning each time for testing purposes, everytime you initialise your consumer with a new groupID, it'll read the messages from the beginning. Here's how I did it :
properties.put("group.id", UUID.randomUUID().toString());
,每次都从头开始阅读消息!
and read messages from the beginning each time!
这篇关于Kafka高级消费者使用Java API从主题获取所有消息(等效于--from-beginning)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!