使用 Spring Embedded Kafka 测试 @KafkaListener [英] Testing a @KafkaListener using Spring Embedded Kafka

查看:49
本文介绍了使用 Spring Embedded Kafka 测试 @KafkaListener的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试为我正在使用 Spring Boot 2.x 开发的 Kafka 侦听器编写单元测试.作为单元测试,我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的实例.所以,我决定使用 Spring Embedded Kafka.

I am trying to write a unit test for a Kafka listener that I am developing using Spring Boot 2.x. Being a unit test, I don't want to start up a full Kafka server an instance of Zookeeper. So, I decided to use Spring Embedded Kafka.

我的听众的定义非常基本.

The definition of my listener is very basic.

@Component
public class Listener {
    private final CountDownLatch latch;

    @Autowired
    public Listener(CountDownLatch latch) {
        this.latch = latch;
    }

    @KafkaListener(topics = "sample-topic")
    public void listen(String message) {
        latch.countDown();
    }
}

此外,在收到消息后验证 latch 计数器是否为零的测试也非常简单.

Also the test, that verifies the latch counter to be equal to zero after receiving a message, is very easy.

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {

    @Autowired
    private KafkaEmbedded embeddedKafka;

    @Autowired
    private CountDownLatch latch;

    private KafkaTemplate<Integer, String> producer;

    @Before
    public void setUp() {
        this.producer = buildKafkaTemplate();
        this.producer.setDefaultTopic("sample-topic");
    }

    private KafkaTemplate<Integer, String> buildKafkaTemplate() {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
        return new KafkaTemplate<>(pf);
    }

    @Test
    public void listenerShouldConsumeMessages() throws InterruptedException {
        // Given
        producer.sendDefault(1, "Hello world");
        // Then
        assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
    }
}

不幸的是,测试失败了,我不明白为什么.是否可以使用 KafkaEmbedded 的实例来测试标记了注释 @KafkaListener 的方法?

Unfortunately, the test fails and I cannot understand why. Is it possible to use an instance of KafkaEmbedded to test a method marked with the annotation @KafkaListener?

所有代码都在我的 GitHub 存储库中共享 kafka-listener.

All the code is shared in my GitHub repository kafka-listener.

谢谢大家.

推荐答案

您可能在为使用者分配主题/分区之前发送消息.设置属性...

You are probably sending the message before the consumer has been assigned the topic/partition. Set property...

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest

...默认为latest.

这就像在控制台消费者中使用 --from-beginning 一样.

This is like using --from-beginning with the console consumer.

编辑

哦;您没有使用引导的属性.

Oh; you're not using boot's properties.

添加

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

EDIT2

顺便说一句,您可能还应该对 template.send() 的结果执行 get(10L, TimeUnit.SECONDS)(Future<;>) 断言发送成功.

BTW, you should probably also do a get(10L, TimeUnit.SECONDS) on the result of the template.send() (a Future<>) to assert that the send was successful.

EDIT3

要覆盖仅为测试而重置的偏移量,您可以执行与对代理地址所做的相同的操作:

To override the offset reset just for the test, you can do the same as what you did for the broker addresses:

@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;

...

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);

@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.auto-offset-reset=earliest"})

但是,请记住,此属性仅适用于组第一次消费时.每次应用启动时总是从最后开始,你必须在启动过程中寻找到最后.

However, bear in mind that this property only applies the first time a group consumes. To always start at the end each time the app starts, you have to seek to the end during startup.

另外,我建议将 enable.auto.commit 设置为 false 以便容器负责提交偏移量,而不是仅仅依赖消费者客户端来做按时间安排.

Also, I would recommend setting enable.auto.commit to false so that the container takes care of committing the offsets rather than just relying on the consumer client doing it on a time schedule.

这篇关于使用 Spring Embedded Kafka 测试 @KafkaListener的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆