Apache Kafka Client (Java):列出主题并检查主题是否被日志压缩 [英] Apache Kafka Client (Java) : List topics and check whether topic is log compacted

查看:35
本文介绍了Apache Kafka Client (Java):列出主题并检查主题是否被日志压缩的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

背景

我们公司有由 Zookeeper 管理的 Apache Kafka.我们的 Spring Boot 应用程序之一需要检查所有可用主题的列表,并列出哪些启用了日志压缩 (cleanup.policy=compact).

We have Apache Kafka managed by Zookeeper in our company. One of our Spring Boot applications needs to check the list of all topic available and also list which ones have log compaction enabled (cleanup.policy=compact).

当前代码


    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerList);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerGroup);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props);
    }

...
...

    public List<String> getTopics() {
        Map<String, List<PartitionInfo>> topics = consumerFactory().createConsumer().listTopics();
        List<String> topicList = new ArrayList<>();
        topics.keySet().remove(CONSUMER_OFFSETS);
        topicList.addAll(topics.keySet());
        return topicList;
    }

问题

通过上面的代码,应用程序可以获得主题列表.有没有办法也知道各个主题是否被日志压缩?我正在寻找的是某种Java"方式来获得与我从终端运行以下 Apache Kafka CLI 命令时得到的响应相同的响应.

With the above code the application can get the list of topics. Is there a way to also get to know whether the individual topics are log compacted ? What I am looking for is some "Java" way to get the same response that I get when I run the following Apache Kafka CLI command from terminal.

kafka-topics --zookeeper localhost:2181 --describe --topic TestTopicCompact

对此的示例响应

Topic:TestTopicCompact  PartitionCount:1    ReplicationFactor:1 Configs:cleanup.policy=compact
    Topic: TestTopicCompact Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

推荐答案

你应该使用 AdminClient API 以检索该信息.

You should use the AdminClient API to retrieve that information.

  • 首先使用listTopics() 检索主题列表.
  • 然后使用 describeConfigs() 获取每个主题的配置.
  • 最后,从您将获得的 ConfigEntry 对象中,您可以过滤具有 compact 作为 cleanup.policy 的主题.
  • First use listTopics() to retrieve the topic list.
  • Then use describeConfigs() to get the configuration for each topic.
  • Finally, out of the ConfigEntry objects you'll get, you can then filter the topics that have compact as the cleanup.policy.

这基本上就是 kafka-topics 工具正在做的事情,所以你可以看看它的源代码 kafka.admin.TopicCommand.尽管是 Scala,但概念是相似的.

This is basically what the kafka-topics tools is doing, so you can have a look at its source kafka.admin.TopicCommand. Even though it's Scala, the concepts are similar.

这篇关于Apache Kafka Client (Java):列出主题并检查主题是否被日志压缩的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆