嵌入式Kafka的春季Kafka集成测试 [英] spring Kafka integration testing with embedded Kafka
问题描述
我有一个Spring Boot应用程序,它的使用者使用一个集群中的主题,然后消费到另一个集群中的另一个主题.
I have spring boot application that had a consumer consumes from topic in one cluster and produces to another topic in different cluster.
现在,我正在尝试使用Spring嵌入式Kafka编写集成测试用例,但出现问题KafkaTemplate could not be registered. A bean with that name has already been defined in class path resource
Now I'm trying to write integration test case using spring embedded Kafka but having an issue KafkaTemplate could not be registered. A bean with that name has already been defined in class path resource
消费阶层
@Service
public class KafkaConsumerService {
@Autowired
private KafkaProducerService kafkaProducerService;
@KafkaListener(topics = "${kafka.producer.topic}")
public void professor(List<Professor> pro) {
pro.forEach(kafkaProducerService::produce);
}
}
生产者类别
@Service
public class KafkaProducerService {
@Value("${kafka.producer.topic}")
private String topic;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void produce(Professor pro) {
kafkaTemplate.send(topic,"professor",pro);
}
}
在我的测试用例中,我想覆盖KafkaTemplate
,以便当我在Test
中调用kafkaConsumerService.professor
方法时,它应该将数据生成到嵌入式Kafka中,并且我应该对其进行验证.
In my Test cases I want to override KafkaTemplate
so that when i call kafkaConsumerService.professor
method in Test
it should produce the data into embedded Kafka and i should validate it.
测试配置
@TestConfiguration
@EmbeddedKafka(partitions = 1, controlledShutdown = false,
brokerProperties = {"listeners=PLAINTEXT://localhost:3333", "port=3333"})
public class KafkaProducerConfigTest {
@Autowired
KafkaEmbedded kafkaEmbeded;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Before
public void setUp() throws Exception {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
kafkaEmbeded.getPartitionsPerTopic());
}
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbeded));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
}
测试课程
@EnableKafka
@SpringBootTest(classes = {KafkaProducerConfigTest.class})
@RunWith(SpringRunner.class)
public class KafkaProducerServiceTest {
@Autowired
private KafkaConsumerService kafkaConsumerService;
@Test
public void testReceive() throws Exception {
kafkaConsumerService.professor(Arrays.asList(new Professor()));
//How to check messages is sent to kafka?
}
}
错误
The bean 'kafkaTemplate', defined in com.kafka.configuration.KafkaProducerConfigTest, could not be registered.
A bean with that name has already been defined in class path resource [com/kafka/configuration/KafkaProducerConfig.class] and overriding is disabled.
Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true
还有人可以帮助我如何验证发送到嵌入式Kafka服务器的消息吗?
And also can some one help me how to validate messages sent into embedded Kafka server?
注意,我有一些过时的警告
Note I'm having some deprecated warnings
不推荐使用KafkaEmbedded类型
The type KafkaEmbedded is deprecated
不推荐使用KafkaEmbedded类型的getPartitionsPerTopic()方法
The method getPartitionsPerTopic() from the type KafkaEmbedded is deprecated
不推荐使用KafkaTestUtils类型的方法producerProps(KafkaEmbedded)
The method producerProps(KafkaEmbedded) from the type KafkaTestUtils is deprecated
推荐答案
Boot 2.1 disables bean overriding by default.
默认情况下禁用Bean覆盖,以防止意外覆盖Bean.如果您要依赖覆盖,则需要将
spring.main.allow-bean-definition-overriding
设置为true
.
关于弃用;请参阅@EmbeddedKafka
的javadocs.替换为EmbeddedKafkaBroker
.
Regarding the deprecations; see the javadocs for @EmbeddedKafka
. It is replaced by EmbeddedKafkaBroker
.
这篇关于嵌入式Kafka的春季Kafka集成测试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!