如何在 Spring Kafka 中包含用于反序列化的类型元数据 [英] How to include Type MetaData for Deserialization in Spring Kafka

查看:23
本文介绍了如何在 Spring Kafka 中包含用于反序列化的类型元数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在 Spring Kafka 中的 Listener 上进行反序列化.但这假设类型信息是由 Spring Kafka 生产者包含或发送的.在我的例子中,Json 是由 Debezium MySQLConnector 发送的,它没有添加这个元数据.所以我想把它添加到请求中.我了解它放置在 JsonSerializer 中某个位置的请求中,并且我查看了源代码,但无法弄清楚如何在序列化期间将元数据类型一般地使用它添加到请求中.特别是哪个字段保存了这种类型的信息?它是序列化的java对象的类名吗?我不认为仅仅设置一个默认的序列化器会起作用,因为我有多个消费者在听不同的主题,正如人们所期望的.除了最简单的情况外,这无法设置一个默认值,因为我有许多要反序列化的使用者和类型.所以这个答案在我的情况下不起作用 Kafka - 在消费者中反序列化对象

I am doing deserialization at the Listener in Spring Kafka. But this assumes that the type information was included or sent by a Spring Kafka producer. In my case the Json is being sent across by the Debezium MySQLConnector and it does not add this meta data. So I would like to add it to the requests. I understand its placed in the request somewhere in the JsonSerializer, and I looked at the source code but could not figure out exactly how to use this to add meta data type during serialization generically to a request. In particular what field holds this type information? And is it the class name of the java object that was serialized? I dont think just setting a default serializer is going to work because I have multiple consumers listening on different topics as one would expect. Except for the simplest cases this is just not going to work to set one default as i have many consumers and types that I am deserializing to. So this answer is not going to work in my case Kafka - Deserializing the object in Consumer

更新尝试在反序列化器上使用方法类型,但有另一个问题:KafkaSpring Deserialzer returnType 静态方法从未调用

Update tried using Method Types on deserializer but have another issue: Kafka Spring Deserialzer returnType static method never called

推荐答案

查看

public abstract class AbstractJavaTypeMapper implements BeanClassLoaderAware {

    /**
     * Default header name for type information.
     */
    public static final String DEFAULT_CLASSID_FIELD_NAME = "__TypeId__";

    /**
     * Default header name for container object contents type information.
     */
    public static final String DEFAULT_CONTENT_CLASSID_FIELD_NAME = "__ContentTypeId__";

    /**
     * Default header name for map key type information.
     */
    public static final String DEFAULT_KEY_CLASSID_FIELD_NAME = "__KeyTypeId__";

    /**
     * Default header name for key type information.
     */
    public static final String KEY_DEFAULT_CLASSID_FIELD_NAME = "__Key_TypeId__";

    /**
     * Default header name for key container object contents type information.
     */
    public static final String KEY_DEFAULT_CONTENT_CLASSID_FIELD_NAME = "__Key_ContentTypeId__";

    /**
     * Default header name for key map key type information.
     */
    public static final String KEY_DEFAULT_KEY_CLASSID_FIELD_NAME = "__Key_KeyTypeId__";

2 组标题(键和值).

2 sets of headers (keys and values).

TypeId 用于简单的类

如果 TypeId 是一个容器 List

ContentTypeId 是包含的类型.

如果 TypeId 是一个 Map

Key_TypeId 是密钥类型.

这允许你重建一个Map.

这些标头可以包含完全限定的类名,也可以包含通过 classIdMappings 映射映射到类名的标记.

These headers can either contain fully qualified class names, or tokens that map to class names via the classIdMappings map.

但是,从2.5版开始,使用新的会更容易

However, since version 2.5, it would be easier to use the new

使用方法确定类型.

这样,您可以设置自己的标题并在方法中检查它们.

That way, you can set your own headers and examine them in the method.

编辑

这是一个简单的例子:

@SpringBootApplication
public class Gitter76Application {

    public static void main(String[] args) {
        SpringApplication.run(Gitter76Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("gitter76").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "gitter76", topics = "gitter76")
    public void listen(Foo in) {
        System.out.println(in);
    }

}

public class Foo {

    private String bar;

    public Foo() {
    }

    public Foo(String bar) {
        this.bar = bar;
    }

    public String getBar() {
        return this.bar;
    }

    public void setBar(String bar) {
        this.bar = bar;
    }

    @Override
    public String toString() {
        return "Foo [bar=" + this.bar + "]";
    }

}

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.demo

$ kafkacat -P -b localhost:9092 -t gitter76 -H __TypeId__=com.example.demo.Foo
{"bar":"baz"}
^C

2020-08-08 09:32:10.034  INFO 58146 --- [ gitter76-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : gitter76: partitions assigned: [gitter76-0]
Foo [bar=baz]

这篇关于如何在 Spring Kafka 中包含用于反序列化的类型元数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆