如何在springboot中为每个主题动态创建单独的Kafka侦听器? [英] How to create separate Kafka listener for each topic dynamically in springboot?

查看:2475
本文介绍了如何在springboot中为每个主题动态创建单独的Kafka侦听器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Spring和Kafka的新手.我正在使用[使用SpringBoot-kafka]用例,其中允许用户在运行时创建kafka主题.预计Spring应用程序将在运行时以编程方式订阅这些主题.到目前为止,我所知道的是,Kafka监听器是设计时间,因此需要在启动之前指定主题.在SpringBoot-Kafka集成中是否可以动态订阅kafka主题?

I am new to Spring and Kafka. I am working on a use case [using SpringBoot-kafka] where in users are allowed to create kafka topics at runtime. The spring application is expected to subscribe to these topics pro-grammatically at runtime. What i know so far is that, Kafka listener are design time and hence topics needs to be specified before startup. Is there a way to dynamically subscribe to kafka topics in SpringBoot-Kafka integration?

推荐了这个 https://github.com/spring-projects/spring-kafka/issues/132

我打算实现的当前方法是,不使用Spring-Kafka集成,而是自己实现Kafka使用者[使用Java代码],如此处所述 spring boot kafka使用者-如何正确使用使用Spring Boot中的kafka消息

Current approach that i am planning to implement is, do not use Spring-Kafka integration instead implement Kafka consumer myself [using java code] as mentioned here spring boot kafka consumer - how to properly consume kafka messages from spring boot

推荐答案

Kafka侦听器仅是设计时",如果您想使用批注指定它们. Spring-kafka还允许您动态创建它们,请参见 KafkaMessageListenerContainer .

Kafka listeners are only "design time" if you want to specify them using annotations. Spring-kafka allows you to create them dynamically as well, see KafkaMessageListenerContainer.

即时创建Kafka侦听器的最简单示例是:

The simplest example of Kafka listener created on the fly would be:

Map<String, Object> consumerConfig = ImmutableMap.of(
    BOOTSTRAP_SERVERS_CONFIG, "brokerAddress",
    GROUP_ID_CONFIG, "groupId"
);

DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory =
        new DefaultKafkaConsumerFactory<>(
                consumerConfig,
                new StringDeserializer(),
                new StringDeserializer());

ContainerProperties containerProperties = new ContainerProperties("topicName");
containerProperties.setMessageListener((MessageListener<String, String>) record -> {
     //do something with received record
} 

ConcurrentMessageListenerContainer container =
        new ConcurrentMessageListenerContainer<>(
                kafkaConsumerFactory,
                containerProperties);

container.start();

有关更多说明和代码,请参阅此博客文章: http ://www.douevencode.com/articles/2017-12/spring-kafka-without-annotations/

For more explanation and code see this blog post: http://www.douevencode.com/articles/2017-12/spring-kafka-without-annotations/

这篇关于如何在springboot中为每个主题动态创建单独的Kafka侦听器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆