向ConCurentKafkaListenerContainerFactory添加Consumer RebalanceListener [英] Adding ConsumerRebalanceListener to the ConcurrentKafkaListenerContainerFactory
本文介绍了向ConCurentKafkaListenerContainerFactory添加Consumer RebalanceListener的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
在一个Spring Boot应用程序中,我使用一个用@KafkaListener注释的类作为消息监听器。我想向我的应用程序中添加一个Consumer RebalanceLister,以便在重新平衡时管理缓存数据。
如何将Consumer RebalanceListener添加到ConCurentKafkaListenerContainerFactory。documentation表示应该在ContainerProperties对象上设置它。目前还不清楚如何访问该对象以设置它。此外,它看起来像是ConcurrentKafkaListenerContainerFactory丢弃重新平衡侦听器,因为它在创建侦听器容器实例时创建一个新的ContainerProperties对象。
我觉得我在这里遗漏了一些非常明显的东西,在this commit之前有一个方法直接在ConCurentKafkaListenerContainerFactory上设置重新平衡侦听器。
推荐答案
考虑在ConcurrentKafkaListenerContainerFactory
上使用此方法:
/**
* Obtain the properties template for this factory - set properties as needed
* and they will be copied to a final properties instance for the endpoint.
* @return the properties.
*/
public ContainerProperties getContainerProperties() {
您可以在此处添加您的ConsumerRebalanceListener
。您@Autowired
自动配置ConcurrentKafkaListenerContainerFactory
并执行上述注入:
@Autowired
private ConcurrentKafkaListenerContainerFactory containerFactory;
@PostConstruct
public void init() {
this.containerFactory.getContainerProperties()
.setConsumerRebalanceListener(myConsumerRebalanceListener());
}
@Bean
public ConsumerRebalanceListener myConsumerRebalanceListener() {
return new ConsumerRebalanceListener() {
...
};
}
这篇关于向ConCurentKafkaListenerContainerFactory添加Consumer RebalanceListener的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文