无法对@KafkaListener注释的方法进行单元测试 [英] Unable to unit test a @KafkaListener annotated method

查看:179
本文介绍了无法对@KafkaListener注释的方法进行单元测试的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在春季对kafka消费者类进行单元测试.我想知道,如果将kafka消息发送到其主题,则正确调用了listener方法.我的消费类是这样注释的:

I'm trying to unit test a kafka consumer class, in Spring. I want to know that if a kafka message is sent to it's topic, the listener method was called correctly. My consumer class is annotated like this:

@KafkaListener(topics = "${kafka.topics.myTopic}")
public void myKafkaMessageEvent(final String message) { ...

如果我@Autowire使用者,则当我发送kafka消息时,将正确调用listener方法,但由于类不是模拟类,因此我无法断言该方法已被调用.

If I @Autowire a consumer, when I send a kafka message, the listener method is called correctly, but I can't assert that the method was called because the class isn't a mock.

如果我模拟消费者,则当我发送kafka消息时,根本不会调用listener方法.我可以直接调用该方法,并断言它可以工作,但这并不能满足我的要求,即检查在我向其主题发送kafka消息时是否调用了该方法.

If I mock a consumer, when I send a kafka message the listener method is not called at all. I can call the method directly, and assert that it worked, but that doesn't do what I want, which is to check if the method is called when I send a kafka message to it's topic.

现在,我已经诉诸于在消费者中放置一个计数器,并在每次调用listener方法时将其递增,然后检查其值是否已更改.对我来说,为测试而创建变量似乎是一个糟糕的解决方案.

For now I have resorted to put a counter inside the consumer, and increment it every time the listener method is called, then check that it's value has been changed. Making a variable just for testing seems like a terrible solution to me.

也许有办法让嘲笑的消费者也收到kafka消息吗?还是通过其他方式断言是否调用了非模拟的消费者侦听器方法?

Is there maybe a way to make the mocked consumer receive the kafka messages too? Or some other way to assert that the non-mocked consumer listener method was called?

推荐答案

听起来像您在请求类似Spring AMQP测试框架中的内容:

Sounds like you are requesting something similar what we have in Spring AMQP Testing Framework: https://docs.spring.io/spring-amqp/docs/2.0.3.RELEASE/reference/html/_reference.html#test-harness

因此,如果您对额外的变量不满意,可以借用

So, if you are not good with extra variable you can borrow that solution and implement your own "harness".

我认为这应该是对框架的一个很好的补充,因此,请提出一个适当的 issue ,我们可以一起为公众带来这样的工具.

I think that should be a good addition to the Framework so, please, raise an appropriate issue and we can together bring such a tool for the public.

更新

因此,根据Spring AMQP基础,我在测试配置中做到了这一点:

So, according Spring AMQP foundation I did this in my test configuration:

public static class KafkaListenerTestHarness extends KafkaListenerAnnotationBeanPostProcessor {

    private final Map<String, Object> listeners = new HashMap<>();

    @Override
    protected void processListener(MethodKafkaListenerEndpoint endpoint, KafkaListener kafkaListener,
            Object bean, Object adminTarget, String beanName) {

        bean = Mockito.spy(bean);

        this.listeners.put(kafkaListener.id(), bean);

        super.processListener(endpoint, kafkaListener, bean, adminTarget, beanName);
    }

    @SuppressWarnings("unchecked")
    public <T> T getSpy(String id) {
        return (T) this.listeners.get(id);
    }

}

...

@SuppressWarnings("rawtypes")
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public static KafkaListenerTestHarness kafkaListenerAnnotationBeanPostProcessor() {
    return new KafkaListenerTestHarness();
}

然后在目标测​​试用例中,我像这样使用它:

Then in the target test-case I use it like this:

@Autowired
private KafkaListenerTestHarness harness;
...
Listener listener = this.harness.getSpy("foo");

verify(listener, times(2)).listen1("foo");

这篇关于无法对@KafkaListener注释的方法进行单元测试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆