使用 JUnit 5 和 EmbeddedKafkaBroker 在 Spring Boot 应用程序中测试 Apache Kafka 集成 [英] Testing an Apache Kafka Integration within a Spring Boot Application with JUnit 5 and EmbeddedKafkaBroker

查看:77
本文介绍了使用 JUnit 5 和 EmbeddedKafkaBroker 在 Spring Boot 应用程序中测试 Apache Kafka 集成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的生产者类,定义如下:

I have a simple producer class defined as follows:

@Configuration
public class MyKafkaProducer {

    private final static Logger log = LoggerFactory.getLogger(MyKafkaProducer.class);

    @Value("${my.kafka.producer.topic}")
    private String topic;

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;

    public void sendDataToKafka(@RequestParam String data) {

        ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topic, data);

        listenableFuture.addCallback(new ListenableFutureCallback<>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("Sent data {}", result.getProducerRecord().value());
            }

            @Override
            public void onFailure(Throwable ex) {
                log.error("Unable to send data {} due to: {}", data, ex.getMessage());
            }
        });
    }
}

这是正在进行的测试类:

And here is work-in-progress test class:

@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyKafkaProducerTest {

    private static final String TOPIC = "device";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    private MyKafkaProducer producer;

    BlockingQueue<ConsumerRecord<String, String>> records;

    KafkaMessageListenerContainer<String, String> container;

    @BeforeAll
    void setUp() {
        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
        DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer());
        ContainerProperties containerProperties = new ContainerProperties(TOPIC);
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        records = new LinkedBlockingQueue<>();
        container.setupMessageListener((MessageListener<String, String>) records::add);
        container.start();
        ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
    }

    @AfterAll
    void tearDown() {
        container.stop();
    }

    @Test
    public void testIfWorks() throws InterruptedException {
        // Arrange
        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();

        // Act
        producer.send(new ProducerRecord<>(TOPIC, "my-aggregate-id", "{\"event\":\"Test Event\"}"));
        producer.flush();

        // Assert
        ConsumerRecord<String, String> singleRecord = records.poll(100, TimeUnit.MILLISECONDS);
        assertThat(singleRecord).isNotNull();
        assertThat(singleRecord.key()).isEqualTo("my-aggregate-id");
        assertThat(singleRecord.value()).isEqualTo("{\"event\":\"Test Event\"}");
    }

问题是测试创建了一个默认的生产者:

The problem is that the test creates a default producer:

Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();

如何使用我自己的生产者 MyKafkaProducer,并调用它的 sendDataToKafka 方法?在这种情况下,我们可以如何测试以及测试什么?

How can I use my own producer, MyKafkaProducer, and call its sendDataToKafka method? How and what can we test in this case?

可以在此处找到源代码.正在进行工作测试的分支是这里.谢谢.

The source code could be found here. The branch with a work-in-progress test is here. Thank you.

推荐答案

所以它是一个 Spring Boot 应用程序,您正在使用自动配置的 KafkaTemplate.

So it's a Spring Boot application and you are using the auto-configured KafkaTemplate.

要覆盖 bootstrap-servers 以使用嵌入式 kafka 代理,请参阅 https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#kafka-测试嵌入式kafka-注解

To override the bootstrap-servers to use the embedded kafka broker, see https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#kafka-testing-embeddedkafka-annotation

@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers")

然后您可以从测试用例中调用您的生产者.

You can then call your producer from the test case.

这篇关于使用 JUnit 5 和 EmbeddedKafkaBroker 在 Spring Boot 应用程序中测试 Apache Kafka 集成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆