向ConCurentKafkaListenerContainerFactory添加Consumer RebalanceListener [英] Adding ConsumerRebalanceListener to the ConcurrentKafkaListenerContainerFactory

查看:8
本文介绍了向ConCurentKafkaListenerContainerFactory添加Consumer RebalanceListener的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在一个Spring Boot应用程序中,我使用一个用@KafkaListener注释的类作为消息监听器。我想向我的应用程序中添加一个Consumer RebalanceLister,以便在重新平衡时管理缓存数据。

如何将Consumer RebalanceListener添加到ConCurentKafkaListenerContainerFactory。documentation表示应该在ContainerProperties对象上设置它。目前还不清楚如何访问该对象以设置它。此外,它看起来像是ConcurrentKafkaListenerContainerFactory丢弃重新平衡侦听器,因为它在创建侦听器容器实例时创建一个新的ContainerProperties对象。

我觉得我在这里遗漏了一些非常明显的东西,在this commit之前有一个方法直接在ConCurentKafkaListenerContainerFactory上设置重新平衡侦听器。

推荐答案

考虑在ConcurrentKafkaListenerContainerFactory上使用此方法:

/**
 * Obtain the properties template for this factory - set properties as needed
 * and they will be copied to a final properties instance for the endpoint.
 * @return the properties.
 */
public ContainerProperties getContainerProperties() {
您可以在此处添加您的ConsumerRebalanceListener。您@Autowired自动配置ConcurrentKafkaListenerContainerFactory并执行上述注入:

@Autowired
private ConcurrentKafkaListenerContainerFactory containerFactory;

@PostConstruct
public void init() {
    this.containerFactory.getContainerProperties()
            .setConsumerRebalanceListener(myConsumerRebalanceListener());
}

@Bean
public ConsumerRebalanceListener myConsumerRebalanceListener() {
    return new ConsumerRebalanceListener() {
        ...
    };
}

这篇关于向ConCurentKafkaListenerContainerFactory添加Consumer RebalanceListener的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆