从 spring-cloud-stream 访问 kafka 生产者工厂 [英] Access kafka producer factory from spring-cloud-stream

查看:43
本文介绍了从 spring-cloud-stream 访问 kafka 生产者工厂的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用 spring-boot-cloud 和 apache-kafka 的项目,感谢 EmbeddedBroker,我有一个涵盖拓扑逻辑的集成测试列表.

I have a project using spring-boot-cloud and apache-kafka, I have a list of integration test covering the topology logic, thanks to EmbeddedBroker.

我最近发现运行这些测试时日志中有很多噪音.

I recently discovered that there are many noise in the log when running these tests.

例如[Producer clientId=producer-2] 无法建立到节点 0 (localhost/127.0.0.1:63267) 的连接.经纪人可能不可用.

e.g. [Producer clientId=producer-2] Connection to node 0 (localhost/127.0.0.1:63267) could not be established. Broker may not be available.

经过反复试验,似乎这些是由 spring-cloud-stream 绑定创建的生产者.以某种方式使用 @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) 在类级别上,它们似乎在每次测试后都没有被清理.

After some trial and error it appears that these were the producers created by the spring-cloud-stream bindings. Somehow with @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) on the class level they do not appear to be cleaned up after each test.

因此我想如果我可以访问生产者工厂,我可以在我的测试类的@AfterEach 子句中手动清理它们.我试过自动装配 KafkaTemplate 但它没有帮助.我不知道如何访问生产者工厂,因为它是由框架隐式创建的.

Thus I figured if I can get access to the producer factory I can then manually clean them up inside the @AfterEach clause of my test class. I've tried to autowire KafkaTemplate but it didn't help. I don't know how I can access the producer factory since it's created implicitly by the framework.

请注意,这些似乎不会影响测试结果,因为它们只会在测试阶段结束时出现.

Please be noted that these do not appear to affect the test result since they only show up at the end of the test phase.

提前致谢!

推荐答案

您可以添加一个 ProducerMessageHandlerCustomizer bean 并通过这种方式获得对生产者工厂的引用.

You can add a ProducerMessageHandlerCustomizer bean and get a reference to the producer factory that way.

@Bean
ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler> cust() {
    return (handler, dest) -> {
        this.pfMap.put(dest, handler.getKafkaTemplate().getProducerFactory());
    }
}

将 PF 存储在测试用例的映射中,然后在您想关闭生产者时 reset() 它.

Store the PF in a map in the test case, then reset() it when you want to close the producer(s).

这篇关于从 spring-cloud-stream 访问 kafka 生产者工厂的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆