Apache Kafka Client (Java):列出主题并检查主题是否被日志压缩 [英] Apache Kafka Client (Java) : List topics and check whether topic is log compacted
问题描述
背景
我们公司有由 Zookeeper 管理的 Apache Kafka.我们的 Spring Boot 应用程序之一需要检查所有可用主题的列表,并列出哪些启用了日志压缩 (cleanup.policy=compact).
We have Apache Kafka managed by Zookeeper in our company. One of our Spring Boot applications needs to check the list of all topic available and also list which ones have log compaction enabled (cleanup.policy=compact).
当前代码
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerList);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
...
...
public List<String> getTopics() {
Map<String, List<PartitionInfo>> topics = consumerFactory().createConsumer().listTopics();
List<String> topicList = new ArrayList<>();
topics.keySet().remove(CONSUMER_OFFSETS);
topicList.addAll(topics.keySet());
return topicList;
}
问题
通过上面的代码,应用程序可以获得主题列表.有没有办法也知道各个主题是否被日志压缩?我正在寻找的是某种Java"方式来获得与我从终端运行以下 Apache Kafka CLI 命令时得到的响应相同的响应.
With the above code the application can get the list of topics. Is there a way to also get to know whether the individual topics are log compacted ? What I am looking for is some "Java" way to get the same response that I get when I run the following Apache Kafka CLI command from terminal.
kafka-topics --zookeeper localhost:2181 --describe --topic TestTopicCompact
对此的示例响应
Topic:TestTopicCompact PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact
Topic: TestTopicCompact Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
推荐答案
你应该使用 AdminClient API 以检索该信息.
You should use the AdminClient API to retrieve that information.
- 首先使用
listTopics()
检索主题列表. - 然后使用
describeConfigs()
获取每个主题的配置. - 最后,从您将获得的 ConfigEntry 对象中,您可以过滤具有
compact
作为cleanup.policy
的主题.
- First use
listTopics()
to retrieve the topic list. - Then use
describeConfigs()
to get the configuration for each topic. - Finally, out of the ConfigEntry objects you'll get, you can then filter the topics that have
compact
as thecleanup.policy
.
这基本上就是 kafka-topics
工具正在做的事情,所以你可以看看它的源代码 kafka.admin.TopicCommand
.尽管是 Scala,但概念是相似的.
This is basically what the kafka-topics
tools is doing, so you can have a look at its source kafka.admin.TopicCommand
. Even though it's Scala, the concepts are similar.
这篇关于Apache Kafka Client (Java):列出主题并检查主题是否被日志压缩的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!