无法对 @KafkaListener 注释方法进行单元测试 [英] Unable to unit test a @KafkaListener annotated method
问题描述
我正在尝试在 Spring 中对 kafka 消费者类进行单元测试.我想知道如果 kafka 消息被发送到它的主题,监听器方法被正确调用.我的消费者类是这样注释的:
I'm trying to unit test a kafka consumer class, in Spring. I want to know that if a kafka message is sent to it's topic, the listener method was called correctly. My consumer class is annotated like this:
@KafkaListener(topics = "${kafka.topics.myTopic}")
public void myKafkaMessageEvent(final String message) { ...
如果我@Autowire 一个消费者,当我发送 kafka 消息时,监听器方法被正确调用,但我不能断言该方法被调用,因为该类不是模拟.
If I @Autowire a consumer, when I send a kafka message, the listener method is called correctly, but I can't assert that the method was called because the class isn't a mock.
如果我模拟消费者,当我发送 kafka 消息时,根本不会调用侦听器方法.我可以直接调用该方法,并断言它有效,但这不是我想要的,即检查当我向其主题发送 kafka 消息时是否调用了该方法.
If I mock a consumer, when I send a kafka message the listener method is not called at all. I can call the method directly, and assert that it worked, but that doesn't do what I want, which is to check if the method is called when I send a kafka message to it's topic.
现在我已经在消费者内部放置一个计数器,并在每次调用侦听器方法时增加它,然后检查它的值是否已更改.为测试创建一个变量对我来说似乎是一个糟糕的解决方案.
For now I have resorted to put a counter inside the consumer, and increment it every time the listener method is called, then check that it's value has been changed. Making a variable just for testing seems like a terrible solution to me.
有没有办法让被模拟的消费者也收到 kafka 消息?或者以其他方式断言非模拟消费者侦听器方法被调用?
Is there maybe a way to make the mocked consumer receive the kafka messages too? Or some other way to assert that the non-mocked consumer listener method was called?
推荐答案
听起来您正在请求类似于我们在 Spring AMQP 测试框架中提供的内容:https://docs.spring.io/spring-amqp/docs/2.0.3.RELEASE/reference/html/_reference.html#test-harness
Sounds like you are requesting something similar what we have in Spring AMQP Testing Framework: https://docs.spring.io/spring-amqp/docs/2.0.3.RELEASE/reference/html/_reference.html#test-harness
所以,如果你不擅长额外的变量,你可以借用 解决方案并实现您自己的线束".
So, if you are not good with extra variable you can borrow that solution and implement your own "harness".
我认为这应该是对框架的一个很好的补充,所以,请提出一个适当的issue,我们可以一起为公众带来这样的工具.
I think that should be a good addition to the Framework so, please, raise an appropriate issue and we can together bring such a tool for the public.
更新
因此,根据 Spring AMQP 基础,我在我的测试配置中这样做了:
So, according Spring AMQP foundation I did this in my test configuration:
public static class KafkaListenerTestHarness extends KafkaListenerAnnotationBeanPostProcessor {
private final Map<String, Object> listeners = new HashMap<>();
@Override
protected void processListener(MethodKafkaListenerEndpoint endpoint, KafkaListener kafkaListener,
Object bean, Object adminTarget, String beanName) {
bean = Mockito.spy(bean);
this.listeners.put(kafkaListener.id(), bean);
super.processListener(endpoint, kafkaListener, bean, adminTarget, beanName);
}
@SuppressWarnings("unchecked")
public <T> T getSpy(String id) {
return (T) this.listeners.get(id);
}
}
...
@SuppressWarnings("rawtypes")
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public static KafkaListenerTestHarness kafkaListenerAnnotationBeanPostProcessor() {
return new KafkaListenerTestHarness();
}
然后在目标测试用例中我像这样使用它:
Then in the target test-case I use it like this:
@Autowired
private KafkaListenerTestHarness harness;
...
Listener listener = this.harness.getSpy("foo");
verify(listener, times(2)).listen1("foo");
这篇关于无法对 @KafkaListener 注释方法进行单元测试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!