Spring Kafka:ApplicationContext中不同对象的多个侦听器 [英] Spring Kafka: Multiple Listeners for different objects within an ApplicationContext
问题描述
我可以向社区咨询听多个主题的最佳方法是什么,每个主题都包含不同类别的消息吗?
Can I please check with the community what is the best way to listen to multiple topics, with each topic containing a message of a different class?
在过去的几天里,我一直在和Spring Kafka一起玩.到目前为止,我的思考过程是:
I've been playing around with Spring Kafka for the past couple of days. My thought process so far:
-
因为初始化KafkaListenerContainerFactory时需要将反序列化器传递到DefaultKafkaConsumerFactory中.这似乎表明,如果我需要多个容器,每个容器反序列化不同类型的消息,则将无法使用@EnableKafka和@KafkaListener批注.
Because you need to pass your deserializer into DefaultKafkaConsumerFactory when initializing a KafkaListenerContainerFactory. This seems to indicate that if I need multiple containers each deserializing a message of a different type, I will not be able to use the @EnableKafka and @KafkaListener annotations.
这使我认为,唯一的方法是实例化多个KafkaMessageListenerContainers.
This leads me to think that the only way to do so would be to instantiate multiple KafkaMessageListenerContainers.
鉴于KafkaMessageListenerContainers是单线程的,并且我需要同时收听多个主题,所以我确实应该使用多个ConcurrentKafkaMessageListenerContainers.
And given that KafkaMessageListenerContainers is single threaded and I need to listen to multiple topics at the same time, I really should be using multiple ConcurrentKafkaMessageListenerContainers.
我会在这里走上正确的路吗?有更好的方法吗?
Would I be on the right track here? Is there a better way to do this?
谢谢!
推荐答案
这是一个非常简单的示例.
Here is a very simple example.
// -----------------------------------------------
// Sender
// -----------------------------------------------
@Configuration
public class SenderConfig {
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
......
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Class1> producerFactory1() {
return new DefaultKafkaProducerFactory<String, Class1>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Class1> kafkaTemplate1() {
return new KafkaTemplate<>(producerFactory1());
}
@Bean
public Sender1 sender1() {
return new Sender1();
}
//-------- send the second class --------
@Bean
public ProducerFactory<String, Class2> producerFactory2() {
return new DefaultKafkaProducerFactory<String, Class2>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Class2> kafkaTemplate2() {
return new KafkaTemplate<>(producerFactory2());
}
@Bean
public Sender2 sender2() {
return new Sender2();
}
}
public class Sender1 {
@Autowired
private KafkaTemplate<String, Class1> kafkaTemplate1;
public void send(String topic, Class1 c1) {
kafkaTemplate1.send(topic, c1);
}
}
public class Sender2 {
@Autowired
private KafkaTemplate<String, Class2> kafkaTemplate2;
public void send(String topic, Class2 c2) {
kafkaTemplate2.send(topic, c2);
}
}
// -----------------------------------------------
// Receiver
// -----------------------------------------------
@Configuration
@EnableKafka
public class ReceiverConfig {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
......
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, Class1> consumerFactory1() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(Class1.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Class1> kafkaListenerContainerFactory1() {
ConcurrentKafkaListenerContainerFactory<String, Class1> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory1());
return factory;
}
@Bean
public Receiver1 receiver1() {
return new Receiver1();
}
//-------- add the second listener
@Bean
public ConsumerFactory<String, Class2> consumerFactory2() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(Class2.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Class2> kafkaListenerContainerFactory2() {
ConcurrentKafkaListenerContainerFactory<String, Class2> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory2());
return factory;
}
@Bean
public Receiver2 receiver2() {
return new Receiver2();
}
}
public class Receiver1 {
@KafkaListener(id="listener1", topics = "topic1", containerFactory = "kafkaListenerContainerFactory1")
public void receive(Class1 c1) {
LOGGER.info("Received c1");
}
}
public class Receiver2 {
@KafkaListener(id="listener2", topics = "topic2", containerFactory = "kafkaListenerContainerFactory2")
public void receive(Class2 c2) {
LOGGER.info("Received c2");
}
}
这篇关于Spring Kafka:ApplicationContext中不同对象的多个侦听器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!