kafka消息的多种实现(java泛型) [英] Multiple implementations of kafka message (java Generics)
问题描述
我正在尝试根据事件的类型为kafka事件创建多个实现类.
I'm trying to create multiple implementation classes for a kafka event based on the type of event.
public class KafkaListener {
@Autowired
Service service;
@KafkaListener(topics = ("mytopic"), containerFactory = "kafkaListenerContainerFactory")
public void consumeSource(Object event) {
service.process(event);
}
}
public interface Service<E> {
void process(E event);
}
public class ServiceImpl1 implements Service<Event1> {
void process(Event1 event1) {
// process
}
}
public class ServiceImpl2 implements Service<Event2> {
void process(Event2 event2) {
// process
}
}
//Event1 & Event2 are 2 POJO classes with different inputs
是否有可能实现或者我应该创建多个侦听器,每种事件类型一个?
Is it possible to implement or am I supposed to create multiple listener, one for each event type?
推荐答案
只要Kafka对事件进行反序列化,就可以将类级别 @KafkaListener
与方法级别 @KafkaHandler一起使用
s.
As long as the events are deserialized by Kafka, you can use a class level @KafkaListener
with method level @KafkaHandler
s.
请参见该文档.
在类级别使用
@KafkaListener
时,必须在方法级别指定@KafkaHandler
.传递消息时,将使用转换后的消息有效负载类型来确定要调用的方法.以下示例显示了如何执行此操作:
When you use
@KafkaListener
at the class-level, you must specify@KafkaHandler
at the method level. When messages are delivered, the converted message payload type is used to determine which method to call. The following example shows how to do so:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true`)
public void listenDefault(Object object) {
...
}
}
从版本2.1.3开始,可以将@KafkaHandler方法指定为默认方法,如果其他方法不匹配,则将调用该默认方法.最多只能指定一种方法.使用
@KafkaHandler方法
时,有效负载必须已经转换为域对象(因此可以执行匹配).使用其TypePrecedence设置为TYPE_ID的自定义解串器,JsonDeserializer
或JsonMessageConverter
.有关更多信息,请参见序列化,反序列化和消息转换.
Starting with version 2.1.3, you can designate a @KafkaHandler method as the default method that is invoked if there is no match on other methods. At most, one method can be so designated. When using
@KafkaHandler methods
, the payload must have already been converted to the domain object (so the match can be performed). Use a custom deserializer, theJsonDeserializer
, or theJsonMessageConverter
with its TypePrecedence set to TYPE_ID. See Serialization, Deserialization, and Message Conversion for more information.
这篇关于kafka消息的多种实现(java泛型)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!