如何在springboot中为每个主题动态创建单独的Kafka侦听器? [英] How to create separate Kafka listener for each topic dynamically in springboot?
问题描述
我是Spring和Kafka的新手.我正在使用[使用SpringBoot-kafka]用例,其中允许用户在运行时创建kafka主题.预计Spring应用程序将在运行时以编程方式订阅这些主题.到目前为止,我所知道的是,Kafka监听器是设计时间,因此需要在启动之前指定主题.在SpringBoot-Kafka集成中是否可以动态订阅kafka主题?
I am new to Spring and Kafka. I am working on a use case [using SpringBoot-kafka] where in users are allowed to create kafka topics at runtime. The spring application is expected to subscribe to these topics pro-grammatically at runtime. What i know so far is that, Kafka listener are design time and hence topics needs to be specified before startup. Is there a way to dynamically subscribe to kafka topics in SpringBoot-Kafka integration?
推荐了这个 https://github.com/spring-projects/spring-kafka/issues/132
我打算实现的当前方法是,不使用Spring-Kafka集成,而是自己实现Kafka使用者[使用Java代码],如此处所述 spring boot kafka使用者-如何正确使用使用Spring Boot中的kafka消息
Current approach that i am planning to implement is, do not use Spring-Kafka integration instead implement Kafka consumer myself [using java code] as mentioned here spring boot kafka consumer - how to properly consume kafka messages from spring boot
推荐答案
Kafka侦听器仅是设计时",如果您想使用批注指定它们. Spring-kafka还允许您动态创建它们,请参见 KafkaMessageListenerContainer .
Kafka listeners are only "design time" if you want to specify them using annotations. Spring-kafka allows you to create them dynamically as well, see KafkaMessageListenerContainer.
即时创建Kafka侦听器的最简单示例是:
The simplest example of Kafka listener created on the fly would be:
Map<String, Object> consumerConfig = ImmutableMap.of(
BOOTSTRAP_SERVERS_CONFIG, "brokerAddress",
GROUP_ID_CONFIG, "groupId"
);
DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(
consumerConfig,
new StringDeserializer(),
new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties("topicName");
containerProperties.setMessageListener((MessageListener<String, String>) record -> {
//do something with received record
}
ConcurrentMessageListenerContainer container =
new ConcurrentMessageListenerContainer<>(
kafkaConsumerFactory,
containerProperties);
container.start();
有关更多说明和代码,请参阅此博客文章: http ://www.douevencode.com/articles/2017-12/spring-kafka-without-annotations/
For more explanation and code see this blog post: http://www.douevencode.com/articles/2017-12/spring-kafka-without-annotations/
这篇关于如何在springboot中为每个主题动态创建单独的Kafka侦听器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!