添加多个KafkaListenerContainerFactories时出现问题 [英] Problems adding multiple KafkaListenerContainerFactories

查看:943
本文介绍了添加多个KafkaListenerContainerFactories时出现问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在Spring Kafka涉足,并成功向我的侦听器添加了一个KafkaListenerContainerFactory.现在,我想添加多个KafkaListenerContainerFactory(一个用于在json中包含消息的主题,另一个用于字符串).参见下面的代码:

Hi I'm currently dabbling in Spring Kafka and succeeded in adding a single KafkaListenerContainerFactory to my listener. Now I'd like to add multiple KafkaListenerContainerFactorys (One for a topic that will have messages in json, another one for strings). See code below:

@EnableKafka
@Configuration
public class KafkaConsumersConfig {

    private final KafkaConfiguration kafkaConfiguration;

    @Autowired
    public KafkaConsumersConfig(KafkaConfiguration kafkaConfiguration) {
        this.kafkaConfiguration = kafkaConfiguration;
    }

    @Bean
    public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(jsonConsumerFactory());
        factory.setConcurrency(3);
        factory.setAutoStartup(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String,Record> jsonConsumerFactory(){
        JsonDeserializer<Record> jsonDeserializer = new JsonDeserializer<>(Record.class);
        return new DefaultKafkaConsumerFactory<>(jsonConsumerConfigs(),new StringDeserializer(), jsonDeserializer);
    }

    @Bean
    public Map<String,Object> jsonConsumerConfigs(){
        Map<String,Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  kafkaConfiguration.getBrokerAddress());
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getJsonGroupId());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
        return propsMap;
    }
    @Bean
    public KafkaListenerContainerFactory<?> kafkaFileListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(fileConsumerFactory());
        factory.setConcurrency(3);
        factory.setAutoStartup(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String,String> fileConsumerFactory(){
        return new DefaultKafkaConsumerFactory<>(fileConsumerConfigs());
    }

    @Bean
    public Map<String,Object> fileConsumerConfigs(){
        Map<String,Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  kafkaConfiguration.getBrokerAddress());
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getFileGroupId());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
        return propsMap;
    }
}

运行此命令会给我以下错误:

Running this gives me the following error:

Description:

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
    - Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans 'jsonConsumerFactory', 'fileConsumerFactory'


Action:

Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.

我在做什么错了?

推荐答案

好像您不会依赖Spring Boot的

Looks like you are not going to rely on the Spring Boot's Kafka Auto Configuration.

Spring Boot在KafkaAutoConfiguration中提供:

Spring Boot provides in the KafkaAutoConfiguration:

@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {

由于您具有jsonConsumerFactoryfileConsumerFactory,它们会覆盖自动配置提供的那个.

Since you have jsonConsumerFactory and fileConsumerFactory, they override that one provided by the auto-config.

但是,另一方面,在KafkaAnnotationDrivenConfiguration中,不能应用您的任何工厂:

But on the other hand, in the KafkaAnnotationDrivenConfiguration, non of your factories can be applied:

@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {

因为您的ConsumerFactory豆不是ConsumerFactory<Object, Object>类型.

所以:

  • 只需将以下内容添加到应用程序属性文件中,即可从Spring Boot自动配置中排除KafkaAutoConfiguration: spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
  • 或将您的KafkaListenerContainerFactory bean之一重命名为kafkaListenerContainerFactory以在引导程序中覆盖它
  • 或将ConsumerFactory豆之一作为ConsumerFactory<Object, Object>类型.
  • Just exclude KafkaAutoConfiguration from the Spring Boot auto configuration by adding the following to the application properties file: spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
  • or rename one of your KafkaListenerContainerFactory beans to the kafkaListenerContainerFactory to override it in the Boot
  • or make one of the ConsumerFactory beans as a ConsumerFactory<Object, Object> type.

这篇关于添加多个KafkaListenerContainerFactories时出现问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆