在Spring Boot中控制启用/禁用Kafka使用者 [英] Control enabling/disabling Kafka consumers in Spring Boot

查看:926
本文介绍了在Spring Boot中控制启用/禁用Kafka使用者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Spring Boot中配置了多个Kafka使用者.这就是kafka.properties的样子(此处仅列出一个使用者的配置):

I have configured several Kafka consumers in Spring Boot. This is what the kafka.properties looks like (only listing config for one consumer here):

kafka.topics=
bootstrap.servers=
group.id=
enable.auto.commit=
auto.commit.interval.ms=
session.timeout.ms=
schema.registry.url=
auto.offset.reset=
kafka.enabled=

这是配置:

@Configuration
@PropertySource({"classpath:kafka.properties"})
public class KafkaConsumerConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConsumerFactory<String, String> pindropConsumerFactory() {
        Map<String, Object> dataRiverProps = new HashMap<>();

        dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
        dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));
        dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));
        dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));
        dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));

        dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));

        return new DefaultKafkaConsumerFactory<>(dataRiverProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(pindropConsumerFactory());
        return factory;
    }
}

这是消费者:

@Component
public class KafkaConsumer {

    @Autowired
    private MessageProcessor messageProcessor;


    @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
    public void consumeJson(String message) {
        // processing message
    }
}

我是否可以使用道具"kafka.enabled"来控制此使用者的创建或消息检索?非常感谢!

Is there a way for me to use the prop "kafka.enabled" so that I can control the creation or maybe the message retrieval of this consumer? Thanks so much!

推荐答案

您可以通过在消费者中使用属性 autoStartup (true/false)来做到这一点-

You can do it by using property autoStartup (true/false) in consumer like below -

@KafkaListener(id = "foo", topics = "Topic1", groupId = "group_id",
        containerFactory = "kafkaListenerContainerFactory",autoStartup = "${listen.auto.start:false}")
public void consume(String message) {
    //System.out.println("Consumed message: " + message);
}

这篇关于在Spring Boot中控制启用/禁用Kafka使用者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆