Apache Kafka 获取特定主题的消费者列表 [英] Apache Kafka get list of consumers on a specific topic

查看:116
本文介绍了Apache Kafka 获取特定主题的消费者列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

因为它可以是标题中的客人,有没有办法在java中获取特定主题的消费者列表?直到现在我才能得到这样的主题列表

As it can be guest from the title, is there a way to get the consumer list on a specific topic in java? Untill now Im able to get the list of topics like this

    final ListTopicsResult listTopicsResult = adminClient.listTopics();
    KafkaFuture<Set<String>> kafkaFuture = listTopicsResult.names();
    Set<String> map = kafkaFuture.get();

但我还没有找到获取每个主题的消费者列表的方法

but I havent found a way to get the list of consumers on each topic

推荐答案

我最近正在为我的 kafka 客户端工具解决同样的问题.这并不容易,但我从代码中发现的唯一方法如下:

I was recently solving the same problem for my kafka client tool. It is not easy, but the only way, which I found from the code is the following:

Properties props = ...//here you put your properties
AdminClient kafkaClient = AdminClient.create(props);

//Here you get all the consumer groups
List<String> groupIds = kafkaClient.listConsumerGroups().all().get().
                       stream().map(s -> s.groupId()).collect(Collectors.toList()); 

//Here you get all the descriptions for the groups
Map<String, ConsumerGroupDescription> groups = kafkaClient.
                                               describeConsumerGroups(groupIds).all().get();
for (final String groupId : groupIds) {
    ConsumerGroupDescription descr = groups.get(groupId);
    //find if any description is connected to the topic with topicName
    Optional<TopicPartition> tp = descr.members().stream().
                                  map(s -> s.assignment().topicPartitions()).
                                  flatMap(coll -> coll.stream()).
                                  filter(s -> s.topic().equals(topicName)).findAny();
            if (tp.isPresent()) {
                //you found the consumer, so collect the group id somewhere
            }
} 

此 API 从 2.0 版开始可用.可能有更好的方法,但我找不到.您还可以在我的 bitbucket

This API is available from the version 2.0. There is probably a better way but I was not able to find one. You can also find the code on my bitbucket

这篇关于Apache Kafka 获取特定主题的消费者列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆