spring Kafka 与嵌入式 Kafka 的集成测试 [英] spring Kafka integration testing with embedded Kafka
问题描述
我有一个 Spring Boot 应用程序,它有一个消费者从一个集群中的主题消费,并在不同集群中生产另一个主题.
I have spring boot application that had a consumer consumes from topic in one cluster and produces to another topic in different cluster.
现在我正在尝试使用 spring 嵌入式 Kafka 编写集成测试用例,但遇到问题 KafkaTemplate 无法注册.类路径资源中已经定义了具有该名称的 bean
Now I'm trying to write integration test case using spring embedded Kafka but having an issue KafkaTemplate could not be registered. A bean with that name has already been defined in class path resource
消费类
@Service
public class KafkaConsumerService {
@Autowired
private KafkaProducerService kafkaProducerService;
@KafkaListener(topics = "${kafka.producer.topic}")
public void professor(List<Professor> pro) {
pro.forEach(kafkaProducerService::produce);
}
}
制作人类
@Service
public class KafkaProducerService {
@Value("${kafka.producer.topic}")
private String topic;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void produce(Professor pro) {
kafkaTemplate.send(topic,"professor",pro);
}
}
在我的测试用例中,我想覆盖 KafkaTemplate
以便当我在 Test
中调用 kafkaConsumerService.professor
方法时,它应该将数据生成到嵌入式 Kafka,我应该验证它.
In my Test cases I want to override KafkaTemplate
so that when i call kafkaConsumerService.professor
method in Test
it should produce the data into embedded Kafka and i should validate it.
测试配置
@TestConfiguration
@EmbeddedKafka(partitions = 1, controlledShutdown = false,
brokerProperties = {"listeners=PLAINTEXT://localhost:3333", "port=3333"})
public class KafkaProducerConfigTest {
@Autowired
KafkaEmbedded kafkaEmbeded;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Before
public void setUp() throws Exception {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
kafkaEmbeded.getPartitionsPerTopic());
}
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbeded));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
}
测试类
@EnableKafka
@SpringBootTest(classes = {KafkaProducerConfigTest.class})
@RunWith(SpringRunner.class)
public class KafkaProducerServiceTest {
@Autowired
private KafkaConsumerService kafkaConsumerService;
@Test
public void testReceive() throws Exception {
kafkaConsumerService.professor(Arrays.asList(new Professor()));
//How to check messages is sent to kafka?
}
}
错误
The bean 'kafkaTemplate', defined in com.kafka.configuration.KafkaProducerConfigTest, could not be registered.
A bean with that name has already been defined in class path resource [com/kafka/configuration/KafkaProducerConfig.class] and overriding is disabled.
Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true
还有谁能帮我验证发送到嵌入式 Kafka 服务器的消息吗?
And also can some one help me how to validate messages sent into embedded Kafka server?
注意我有一些已弃用的警告
KafkaEmbedded 类型已弃用
The type KafkaEmbedded is deprecated
不推荐使用来自 KafkaEmbedded 类型的 getPartitionsPerTopic() 方法
The method getPartitionsPerTopic() from the type KafkaEmbedded is deprecated
不推荐使用 KafkaTestUtils 类型中的 producerProps(KafkaEmbedded) 方法
The method producerProps(KafkaEmbedded) from the type KafkaTestUtils is deprecated
推荐答案
Boot 2.1 默认禁用 bean 覆盖.
默认情况下已禁用 Bean 覆盖,以防止 Bean 被意外覆盖.如果您依赖覆盖,则需要将 spring.main.allow-bean-definition-overriding
设置为 true
.
Bean overriding has been disabled by default to prevent a bean being accidentally overridden. If you are relying on overriding, you will need to set
spring.main.allow-bean-definition-overriding
totrue
.
关于弃用;请参阅 @EmbeddedKafka
的 javadoc.它被 EmbeddedKafkaBroker
取代.
Regarding the deprecations; see the javadocs for @EmbeddedKafka
. It is replaced by EmbeddedKafkaBroker
.
这篇关于spring Kafka 与嵌入式 Kafka 的集成测试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!