从 spring-cloud-stream 访问 kafka 生产者工厂 [英] Access kafka producer factory from spring-cloud-stream
问题描述
我有一个使用 spring-boot-cloud 和 apache-kafka 的项目,感谢 EmbeddedBroker,我有一个涵盖拓扑逻辑的集成测试列表.
I have a project using spring-boot-cloud and apache-kafka, I have a list of integration test covering the topology logic, thanks to EmbeddedBroker.
我最近发现运行这些测试时日志中有很多噪音.
I recently discovered that there are many noise in the log when running these tests.
例如[Producer clientId=producer-2] 无法建立到节点 0 (localhost/127.0.0.1:63267) 的连接.经纪人可能不可用.
e.g. [Producer clientId=producer-2] Connection to node 0 (localhost/127.0.0.1:63267) could not be established. Broker may not be available.
经过反复试验,似乎这些是由 spring-cloud-stream 绑定创建的生产者.以某种方式使用 @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) 在类级别上,它们似乎在每次测试后都没有被清理.
After some trial and error it appears that these were the producers created by the spring-cloud-stream bindings. Somehow with @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) on the class level they do not appear to be cleaned up after each test.
因此我想如果我可以访问生产者工厂,我可以在我的测试类的@AfterEach 子句中手动清理它们.我试过自动装配 KafkaTemplate 但它没有帮助.我不知道如何访问生产者工厂,因为它是由框架隐式创建的.
Thus I figured if I can get access to the producer factory I can then manually clean them up inside the @AfterEach clause of my test class. I've tried to autowire KafkaTemplate but it didn't help. I don't know how I can access the producer factory since it's created implicitly by the framework.
请注意,这些似乎不会影响测试结果,因为它们只会在测试阶段结束时出现.
Please be noted that these do not appear to affect the test result since they only show up at the end of the test phase.
提前致谢!
推荐答案
您可以添加一个 ProducerMessageHandlerCustomizer
bean 并通过这种方式获得对生产者工厂的引用.
You can add a ProducerMessageHandlerCustomizer
bean and get a reference to the producer factory that way.
@Bean
ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler> cust() {
return (handler, dest) -> {
this.pfMap.put(dest, handler.getKafkaTemplate().getProducerFactory());
}
}
将 PF 存储在测试用例的映射中,然后在您想关闭生产者时 reset()
它.
Store the PF in a map in the test case, then reset()
it when you want to close the producer(s).
这篇关于从 spring-cloud-stream 访问 kafka 生产者工厂的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!