Spring Kafka:在运行时订阅新的主题模式 [英] Spring Kafka : Subscribe to a new Topic Pattern during Runtime
问题描述
我正在使用注解 @KafkaListener 在我的应用程序中使用主题.我需要在运行时更改已经运行的使用者的主题模式,以便可以使用与新模式匹配的新主题.
I'm using the annotation @KafkaListener to consume topics in my application. I need to change the topic pattern at runtime in an already running consumer so that new topics that match the new pattern can be consumed.
我尝试了下面的代码,但它仍然使用与旧主题模式匹配的主题.在这里,我在应用程序启动时设置了旧主题模式".然后,我使用 Spring @Scheduler 每 10 秒将模式更新为new-topic-pattern".
I tried the below code, but it still consumes the topics matching the old topic pattern. Here, I have set the "old-topic-pattern" at application start-up. Then, I'm updating the pattern to "new-topic-pattern" every 10 seconds using a Spring @Scheduler.
Class "KafkaTopicPatternConfig.java":
@Configuration
public class KafkaTopicPatternConfig {
@Bean
public String kafkaTopicPattern(Environment env) {
logger.info("Getting kafka topic pattern");
String kafkaTopicPattern = "old-topic-pattern";
return kafkaTopicPattern;
}
}
Class "Consumer.java":
@Component
public class Consumer implements ConsumerSeekAware{
@Autowired
@Qualifier("kafkaTopicPattern")
private String kafkaTopicPattern;
@KafkaListener(topicPattern = "#{kafkaTopicPattern}", id = "s4federatorConsumer")
public void processMessage(@Payload ConsumerRecord<String, Object> record,
@Header(KafkaHeaders.OFFSET) Long offset,
@Header(KafkaHeaders.CONSUMER) KafkaConsumer<String, String> consumer,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partitionId) {
//do something with the consumed message
}
@Scheduled(fixedDelay = 10000, initialDelay = 15000)
public void refreshKafkaTopics() {
logger.info("Inside scheduler to refresh kafka topics");
this.kafkaTopicPattern = "new-topic-pattern";
this.kafkaListenerEndpointRegistry.getListenerContainer("s4federatorConsumer").stop();
this.kafkaListenerEndpointRegistry.getListenerContainer("s4federatorConsumer").start();
}
}
推荐答案
您将 kafkaTopicPattern 作为 -
You are getting kafkaTopicPattern as -
@Qualifier("kafkaTopicPattern")
private String kafkaTopicPattern;
我看到你正在更新模式 -
I see you are updating the pattern like -
this.kafkaTopicPattern = "new-topic-pattern";
但是如果这两个在不同的实例对象中,则注入到侦听器中的kafkaTopicPattern"的原始值不会因此刷新.因此,您必须确保使用新模式刷新侦听器对象.
But the original value for "kafkaTopicPattern" which is injected in listener wont be refreshed by this if these 2 are in different instance objects. So you will have to make sure that the listener objects are refreshed with the new pattern.
这篇关于Spring Kafka:在运行时订阅新的主题模式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!