Kafka消费者动态获取主题 [英] Kafka consumer picking up topics dynamically

查看:703
本文介绍了Kafka消费者动态获取主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Spring Boot中配置了一个Kafka使用者。这是配置类:

I have a Kafka consumer configured in Spring Boot. Here's the config class:

@EnableKafka
@Configuration
@PropertySource({"classpath:kafka.properties"})
public class KafkaConsumerConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConsumerFactory<String, GenericData.Record> consumerFactory() {

        dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
        dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));
        dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));
        dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));
        dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));
        dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));

        dataRiverProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, env.getProperty("schema.registry.url"));
        dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());

        return new DefaultKafkaConsumerFactory<>(dataRiverProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

这里是消费者:

@Component
public class KafkaConsumer {

    @Autowired
    private MessageProcessor messageProcessor;

    @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
    public void consumeAvro(GenericData.Record message) {
        messageProcessor.process();
    }

}

请注意,我正在使用 topics =#{'$ {kafka.topics}'。split(',')} ,以从属性文件中选择主题。
这就是我的kafka.properties文件的样子:

Please note that I am using topics = "#{'${kafka.topics}'.split(',')}" to pick up the topics from a properties file. And this is what my kafka.properties file looks like:

kafka.topics=pwdChange,pwdCreation
bootstrap.servers=aaa.bbb.com:37900
group.id=pwdManagement
enable.auto.commit=true
auto.commit.interval.ms=1000
session.timeout.ms=30000
schema.registry.url=http://aaa.bbb.com:37800

现在,如果我要向订阅添加一个新主题,例如pwdExpire,并按如下所示修改属性文件:

Now if I am to add a new topic to the subscription, say pwdExpire, and modify the prop files as follows:

kafka.topics=pwdChange,pwdCreation,pwdExpire

我的消费者是否有办法开始订阅此新主题,而无需重新启动服务器?
我发现了此帖子 Spring Kafka-在运行时订阅新主题,但是文档中有关于 metadata.max.age.ms 的说法:

Is there a way for my consumer to start subscribe to this new topic without restarting the server? I have found this post Spring Kafka - Subscribe new topics during runtime, but the documentation has this to say about metadata.max.age.ms:


一段时间(以毫秒为单位),在此之后我们强制刷新
元数据,即使我们没有看到任何分区领导更改为
主动发现任何新的代理或分区。

The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.

对我来说听起来是行不通的。谢谢您的帮助!

Sounds to me it won't work. Thanks for your help!

推荐答案

否;唯一的方法是使用主题模式;当添加新主题(与模式匹配)时,默认情况下,代理会在5分钟后将其添加到订阅中。

No; the only way to do that is to use a topic pattern; as new topics are added (that match the pattern), the broker will add them to the subscription, after 5 minutes, by default.

但是,您可以添加新主题运行时新主题的侦听器容器。

You can, however, add new listener container(s) for the new topic(s) at runtime.

另一种选择是加载 @KafkaListener 在子应用程序上下文中放置bean,并在每次主题更改时重新创建上下文。

Another option would be to load the @KafkaListener bean in a child application context and re-create the context each time the topic(s) change.

EDIT

有关 KafkaConsumer.subscribe(图案模式) ...

/**
 * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
 * The pattern matching will be done periodically against topics existing at the time of check.
 * <p>
 ...

这篇关于Kafka消费者动态获取主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆