如何测试 ConsumerAwareRebalanceListener? [英] How to test a ConsumerAwareRebalanceListener?
问题描述
我使用 Spring Boot 2.0.6 开发了一个 @KafkaListener
,它也标有 ConsumerAwareRebalanceListener
接口.我实现了 onPartitionsAssigned
方法,在该方法中我倒退了固定时间的偏移量,比如说 60 秒.
到目前为止一切顺利.
如何使用 Spring Kafka 提供的工具测试上述用例?我想我需要启动一个 Kafka 代理(即 EmbeddedKafka
),然后停止侦听器,然后再次重新启动它,以测试它是否再次读取过去 60 秒内到达的消息.>
有人可以帮我吗?我用谷歌搜索了一下,但没有找到任何东西.非常感谢.
@KafkaListener
有一个:
/*** 为此端点管理的容器的唯一标识符.* <p>如果没有指定,则提供自动生成的.* @return 管理此端点的容器的 {@code id}.* @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)*/String id() 默认"";
属性,因此您可以通过提到的KafkaListenerEndpointRegistry
访问其MessageListenerContainer
,您可以简单地@Autowired
进入基于测试类关于 Spring 测试框架.然后,你真的可以在你的测试方法中 stop()
和 start()
那个 MessageListenerContainer
.
还要注意 @KafkaListener
如何也有一个 autoStartup()
属性.
I developed a @KafkaListener
that is also marked with the ConsumerAwareRebalanceListener
interface, using Spring Boot 2.0.6. I implemented the onPartitionsAssigned
method, in which I rewind the offset of a fixed amount of time, let's say 60 seconds.
So far so good.
How can I test the above use case using the tools that Spring Kafka gives me? I supposed I need to start a Kafka broker (i.e., an EmbeddedKafka
), then stopping the listener and then rebooting it again, to test that it read again the messages arrived in the last 60 seconds.
Can somebody help me? I googled a little, but I didn't find anything. Thanks a lot.
The @KafkaListener
has an:
/**
* The unique identifier of the container managing for this endpoint.
* <p>If none is specified an auto-generated one is provided.
* @return the {@code id} for the container managing for this endpoint.
* @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
*/
String id() default "";
attribute, so you can get an access to its MessageListenerContainer
via mentioned KafkaListenerEndpointRegistry
, which you can simply @Autowired
into the test class based on Spring Testing Framework. Then, you can really stop()
and start()
that MessageListenerContainer
in your test method.
Also pay attention how @KafkaListener
has an autoStartup()
attribute also.
这篇关于如何测试 ConsumerAwareRebalanceListener?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!