多个消费者使用春卡夫卡 [英] Multiple consumers using spring kafka
本文介绍了多个消费者使用春卡夫卡的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我希望在我的应用程序中设置多个Kafka主题侦听器。下面是我的设置。它应该由两个组使用,但只由一个监听器使用。我在这里错过了什么?
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory(consumerConfigs());
return consumerFactory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(100);
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean("notificationFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> notificationFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(100);
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean("insertContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> insertContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(100);
factory.setConsumerFactory(consumerFactory());
return factory;
}
@KafkaListener(id = "insert_listener", topics = "${kafka.topic.readlocation}", group = "insert_listener", containerFactory = "insertContainerFactory")
public void receiveForInsert(String message) {
locationProcessor.insertLocationData(message);
}
@KafkaListener(id = "notification_listener", topics = "${kafka.topic.readlocation}", group = "notification_listener",containerFactory="notificationFactory")
public void receiveForNotification(String message) {
locationProcessor.processNotificationMessage(message);
}
编辑:以下是运行正常的代码
@KafkaListener(id = "insert_listener", topics = "${kafka.topic.readlocation}", groupId = "insert_listener")
public void receiveForInsert(String message) {
locationProcessor.insertLocationData(message);
}
推荐答案
每个都需要不同的group.id
;group
属性不是group.id
-请参阅在即将发布的1.3版本中,有一个新的groupId
属性,我们还可以将id
作为一个组使用(如果存在)。
对于早期版本,每个版本需要不同的使用者工厂。
这篇关于多个消费者使用春卡夫卡的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文