spring Kafka 与嵌入式 Kafka 的集成测试 [英] spring Kafka integration testing with embedded Kafka

查看:56
本文介绍了spring Kafka 与嵌入式 Kafka 的集成测试的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Spring Boot 应用程序,它有一个消费者从一个集群中的主题消费,并在不同集群中生产另一个主题.

I have spring boot application that had a consumer consumes from topic in one cluster and produces to another topic in different cluster.

现在我正在尝试使用 spring 嵌入式 Kafka 编写集成测试用例,但遇到问题 KafkaTemplate 无法注册.类路径资源中已经定义了具有该名称的 bean

Now I'm trying to write integration test case using spring embedded Kafka but having an issue KafkaTemplate could not be registered. A bean with that name has already been defined in class path resource

消费类

@Service
public class KafkaConsumerService {

@Autowired
private KafkaProducerService kafkaProducerService;

@KafkaListener(topics = "${kafka.producer.topic}")
public void professor(List<Professor> pro) {
    pro.forEach(kafkaProducerService::produce);
    
   }

}

制作人类

@Service
public class KafkaProducerService {

@Value("${kafka.producer.topic}")
private String topic;

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

public void produce(Professor pro) {
    kafkaTemplate.send(topic,"professor",pro);
  }

 }

在我的测试用例中,我想覆盖 KafkaTemplate 以便当我在 Test 中调用 kafkaConsumerService.professor 方法时,它应该将数据生成到嵌入式 Kafka,我应该验证它.

In my Test cases I want to override KafkaTemplate so that when i call kafkaConsumerService.professor method in Test it should produce the data into embedded Kafka and i should validate it.

测试配置

@TestConfiguration
@EmbeddedKafka(partitions = 1, controlledShutdown = false,
brokerProperties = {"listeners=PLAINTEXT://localhost:3333", "port=3333"})
public class KafkaProducerConfigTest {

@Autowired
 KafkaEmbedded kafkaEmbeded;

@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Before
public void setUp() throws Exception {
  for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
    ContainerTestUtils.waitForAssignment(messageListenerContainer, 
    kafkaEmbeded.getPartitionsPerTopic());
  }
}

@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbeded));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    return kafkaTemplate;
   }

 }

测试类

@EnableKafka
@SpringBootTest(classes = {KafkaProducerConfigTest.class})
@RunWith(SpringRunner.class)
public class KafkaProducerServiceTest {

@Autowired
private KafkaConsumerService kafkaConsumerService;

@Test
public void testReceive() throws Exception {
     kafkaConsumerService.professor(Arrays.asList(new Professor()));
     
     //How to check messages is sent to kafka?
}

 }

错误

 The bean 'kafkaTemplate', defined in com.kafka.configuration.KafkaProducerConfigTest, could not be registered. 
 A bean with that name has already been defined in class path resource [com/kafka/configuration/KafkaProducerConfig.class] and overriding is disabled.
 Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true


还有谁能帮我验证发送到嵌入式 Kafka 服务器的消息吗?


And also can some one help me how to validate messages sent into embedded Kafka server?

注意我有一些已弃用的警告

KafkaEmbedded 类型已弃用

The type KafkaEmbedded is deprecated

不推荐使用来自 KafkaEmbedded 类型的 getPartitionsPerTopic() 方法

The method getPartitionsPerTopic() from the type KafkaEmbedded is deprecated

不推荐使用 KafkaTestUtils 类型中的 producerProps(KafkaEmbedded) 方法

The method producerProps(KafkaEmbedded) from the type KafkaTestUtils is deprecated

推荐答案

Boot 2.1 默认禁用 bean 覆盖.

默认情况下已禁用 Bean 覆盖,以防止 Bean 被意外覆盖.如果您依赖覆盖,则需要将 spring.main.allow-bean-definition-overriding 设置为 true.

Bean overriding has been disabled by default to prevent a bean being accidentally overridden. If you are relying on overriding, you will need to set spring.main.allow-bean-definition-overriding to true.

关于弃用;请参阅 @EmbeddedKafka 的 javadoc.它被 EmbeddedKafkaBroker 取代.

Regarding the deprecations; see the javadocs for @EmbeddedKafka. It is replaced by EmbeddedKafkaBroker.

这篇关于spring Kafka 与嵌入式 Kafka 的集成测试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆