通过Spring-Kafka列出Kafka主题 [英] List Kafka Topics via Spring-Kafka

查看:176
本文介绍了通过Spring-Kafka列出Kafka主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们想通过spring-kafka列出所有Kafka主题,以获得类似于kafka命令的结果:

We would like to list all Kafka topics via spring-kafka to get results similar to the kafka command:

bin/kafka-topics.sh --list --zookeeper localhost:2181

在下面的服务中运行getTopics()方法时,我们得到 org.apache.kafka.common.errors.TimeoutException:提取主题元数据时超时已过期

When running the getTopics() method in the service below, we get org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

配置:

@EnableKafka
@Configuration
public class KafkaConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
}

服务:

@Service
public class TopicServiceKafkaImpl implements TopicService {
    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    @Override
    public Set<String> getTopics() {
        try (Consumer<String, String> consumer = 
            consumerFactory.createConsumer()) {
            Map<String, List<PartitionInfo>> map = consumer.listTopics();
            return map.keySet();
    }
}

Kafka已启动并正在运行,我们可以成功地从应用程序向主题发送消息.

Kafka is up and running and we can send messages from our app to a topic succesfully.

推荐答案

您正在连接到Zookeeper(2181),而不是Kafka(默认为9092).

You are connecting to Zookeeper (2181) instead of Kafka (9092 by default).

Java kafka客户端不再直接与ZK对话.

The Java kafka clients no longer talk directly to ZK.

这篇关于通过Spring-Kafka列出Kafka主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆