kafka消息的多种实现(java泛型) [英] Multiple implementations of kafka message (java Generics)

查看:103
本文介绍了kafka消息的多种实现(java泛型)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试根据事件类型为 kafka 事件创建多个实现类.

公共类KafkaListener {@自动连线服务服务;@KafkaListener(topics = ("mytopic"), containerFactory = "kafkaListenerContainerFactory")公共无效消费来源(对象事件){服务进程(事件);}}公共接口 Service<E>{无效过程(E事件);}公共类 ServiceImpl1 实现了 Service{无效过程(事件1事件1){//过程}}公共类 ServiceImpl2 实现了 Service{无效过程(事件2事件2){//过程}}//事件1 &Event2 是 2 个具有不同输入的 POJO 类

是否可以实现或者我应该创建多个侦听器,每个事件类型一个?

解决方案

只要事件被 Kafka 反序列化,就可以使用类级别 @KafkaListener 和方法级别 @KafkaHandlers.

参见 文档.

<块引用>

在类级别使用@KafkaListener时,必须在方法级别指定@KafkaHandler.传递消息时,转换后的消息有效负载类型用于确定要调用的方法.以下示例显示了如何执行此操作:

@KafkaListener(id = "multi", topics = "myTopic")静态类 MultiListenerBean {@KafkaHandler公共无效听(字符串 foo){...}@KafkaHandler公共无效听(整数条){...}@KafkaHandler(isDefault = true`)公共无效listenDefault(对象对象){...}}

<块引用>

从 2.1.3 版本开始,您可以指定一个 @KafkaHandler 方法作为默认方法,如果其他方法没有匹配项,则调用该方法.最多只能指定一种方法.使用 @KafkaHandler 方法 时,payload 必须已经转换为域对象(因此可以执行匹配).使用自定义解串器、JsonDeserializerJsonMessageConverter 并将其 TypePrecedence 设置为 TYPE_ID.有关详细信息,请参阅序列化、反序列化和消息转换.

I'm trying to create multiple implementation classes for a kafka event based on the type of event.

public class KafkaListener {
    @Autowired
    Service service;

    @KafkaListener(topics = ("mytopic"), containerFactory = "kafkaListenerContainerFactory")
    public void consumeSource(Object event) {
        service.process(event);
    }
}

public interface Service<E> {
    void process(E event);
}

public class ServiceImpl1 implements Service<Event1> {
    void process(Event1 event1) {
         // process 
    }
}

public class ServiceImpl2 implements Service<Event2> {
    void process(Event2 event2) {
         // process 
    }
}

//Event1 & Event2 are 2 POJO classes with different inputs

Is it possible to implement or am I supposed to create multiple listener, one for each event type?

解决方案

As long as the events are deserialized by Kafka, you can use a class level @KafkaListener with method level @KafkaHandlers.

See the documentation.

When you use @KafkaListener at the class-level, you must specify @KafkaHandler at the method level. When messages are delivered, the converted message payload type is used to determine which method to call. The following example shows how to do so:

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true`)
    public void listenDefault(Object object) {
        ...
    }

}

Starting with version 2.1.3, you can designate a @KafkaHandler method as the default method that is invoked if there is no match on other methods. At most, one method can be so designated. When using @KafkaHandler methods, the payload must have already been converted to the domain object (so the match can be performed). Use a custom deserializer, the JsonDeserializer, or the JsonMessageConverter with its TypePrecedence set to TYPE_ID. See Serialization, Deserialization, and Message Conversion for more information.

这篇关于kafka消息的多种实现(java泛型)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆