如何测试 ConsumerAwareRebalanceListener? [英] How to test a ConsumerAwareRebalanceListener?

查看:32
本文介绍了如何测试 ConsumerAwareRebalanceListener?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 Spring Boot 2.0.6 开发了一个 @KafkaListener,它也标有 ConsumerAwareRebalanceListener 接口.我实现了 onPartitionsAssigned 方法,在该方法中我倒退了固定时间的偏移量,比如说 60 秒.

到目前为止一切顺利.

如何使用 Spring Kafka 提供的工具测试上述用例?我想我需要启动一个 Kafka 代理(即 EmbeddedKafka),然后停止侦听器,然后再次重新启动它,以测试它是否再次读取过去 60 秒内到达的消息.>

有人可以帮我吗?我用谷歌搜索了一下,但没有找到任何东西.非常感谢.

解决方案

@KafkaListener 有一个:

/*** 为此端点管理的容器的唯一标识符.* <p>如果没有指定,则提供自动生成的.* @return 管理此端点的容器的 {@code id}.* @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)*/String id() 默认"";

属性,因此您可以通过提到的KafkaListenerEndpointRegistry 访问其MessageListenerContainer,您可以简单地@Autowired 进入基于测试类关于 Spring 测试框架.然后,你真的可以在你的测试方法中 stop()start() 那个 MessageListenerContainer.

还要注意 @KafkaListener 如何也有一个 autoStartup() 属性.

I developed a @KafkaListener that is also marked with the ConsumerAwareRebalanceListener interface, using Spring Boot 2.0.6. I implemented the onPartitionsAssigned method, in which I rewind the offset of a fixed amount of time, let's say 60 seconds.

So far so good.

How can I test the above use case using the tools that Spring Kafka gives me? I supposed I need to start a Kafka broker (i.e., an EmbeddedKafka), then stopping the listener and then rebooting it again, to test that it read again the messages arrived in the last 60 seconds.

Can somebody help me? I googled a little, but I didn't find anything. Thanks a lot.

解决方案

The @KafkaListener has an:

/**
 * The unique identifier of the container managing for this endpoint.
 * <p>If none is specified an auto-generated one is provided.
 * @return the {@code id} for the container managing for this endpoint.
 * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
 */
String id() default "";

attribute, so you can get an access to its MessageListenerContainer via mentioned KafkaListenerEndpointRegistry, which you can simply @Autowired into the test class based on Spring Testing Framework. Then, you can really stop() and start() that MessageListenerContainer in your test method.

Also pay attention how @KafkaListener has an autoStartup() attribute also.

这篇关于如何测试 ConsumerAwareRebalanceListener?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆