AVRO 原始类型的 Serde 类 [英] Serde class for AVRO primitive type

查看:26
本文介绍了AVRO 原始类型的 Serde 类的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在用 Java 编写一个 Kafka 流应用程序,它接受由连接器创建的输入主题,该连接器使用模式注册表和 avro 作为键和值转换器.连接器产生以下架构:

I'm writing a Kafka stream app in Java that takes input topics created by a connector that uses the schema registry and avro for both the key and value converter. The connector produces the following schemas:

key-schema: "int"
value-schema:{
"type": "record",
"name": "User",
"fields": [
    {"name": "firstname", "type": "string"},
    {"name": "lastname",  "type": "string"}
]}

实际上,有几个主题,key-schema 总是int",value-schema 总是某种记录(用户、产品等).我的代码包含以下定义

Actually, there are several topics, the key-schema is always "int" and the value-schema is always a record of some kind (User, Product, etc). My code contains the following definitions

Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);

Serde<User> userSerde = new SpecificAvroSerde<>();
userSerde.configure(serdeConfig, false);

起初我尝试用类似的东西来消费这个话题Consumed.with(Serdes.Integer(), userSerde); 但这不起作用,因为 Serdes.Integer() 期望使用 4 个字节对整数进行编码,而 avro 使用可变长度编码.使用 Consumed.with(Serdes.Bytes(), userSerde); 工作但我真的想要 int 而不是字节所以我把我的代码改成这个

At first I tried consuming the topic with something like Consumed.with(Serdes.Integer(), userSerde); but that did not work because Serdes.Integer() expects integers to be encoded using 4 bytes but avro uses a variable length encoding. Using Consumed.with(Serdes.Bytes(), userSerde); worked but I really wanted int and not bytes so I changed my code to this

KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer()
KafkaAvroSerializer keySerializer = new KafkaAvroSerializer();
keyDeserializer.configure(serdeConfig, true); 
keySerializer.configure(serdeConfig, true);
Serde<Integer> keySerde = (Serde<Integer>)(Serde)Serdes.serdeFrom(keySerializer, keyDeserializer);

这使编译器产生警告(它不喜欢 (Serde)(Serde) 转换)但它允许我使用

This made the compiler produce a warning (it doesn't like the (Serde<Integer>)(Serde) casting) but it allows me to use

Consumed.with(keySerde, userSerde); 并获取一个整数作为键.这工作得很好,我的应用程序按预期运行(很棒!!!).但是现在我想为键/值定义默认的 serde 并且我无法让它工作.

Consumed.with(keySerde, userSerde); and get an integer as the key. This works just fine and my app is behaving as expected (great!!!). But now I want to define default serde for the key/value and I cannot get it to work.

设置 serde 的默认值很简单:

Setting the default value serde is simple:

streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);

但是我不知道如何定义默认键 serde.

However I cannot figure out how to define the default key serde.

我试过了

  1. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde.getClass().getName());产生运行时错误:找不到 org.apache.kafka.common.serialization.Serdes$WrapperSerde 的公共无参数构造函数
  2. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);产生运行时错误:java.lang.Integer 不能转换为 org.apache.avro.specific.SpecificRecord
  1. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde.getClass().getName()); Produces runtime error: Could not find a public no-argument constructor for org.apache.kafka.common.serialization.Serdes$WrapperSerde
  2. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); Produces runtime error: java.lang.Integer cannot be cast to org.apache.avro.specific.SpecificRecord

我错过了什么?谢谢.

推荐答案

更新 (5.5 及更新版本)

Confluent 版本 5.5 通过 PrimitiveAvroSerde 添加对原始 Avro 类型的原生支持(参见 https://github.com/confluentinc/schema-registry/blob/5.5.x/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro/PrimitiveAvroSerde.java)

Confluent version 5.5 adds native support for primitive Avro types via PrimitiveAvroSerde (cf. https://github.com/confluentinc/schema-registry/blob/5.5.x/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro/PrimitiveAvroSerde.java)

原始答案(5.4 及更早版本):

这是一个已知问题.原始 Avro 类型不适用于 Confluent 的 AvroSerdes,因为 Serdes 仅适用于 GenericAvroRecordSpecificAvroRecord.

It's a known issues. Primitive Avro types don't work well with Confluent's AvroSerdes, because the Serdes works with GenericAvroRecord and SpecificAvroRecord only.

比较 https://github.com/confluentinc/schema-registry/tree/master/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro.

因此,基于 KafkaAvroSerializerKafkaAvroDeserializer 构建您自己的 Serde 是正确的方法.为了能够将其作为默认 Serde 传递到配置中,您不能使用 Serdes.serdeFrom,因为类型信息会因泛型类型擦除而丢失.

Thus, building you own Serde based on KafkaAvroSerializer and KafkaAvroDeserializer is the right approach. To be able to pass this into the config as default Serde, you cannot use Serdes.serdeFrom because the type information is lost due to genrics type erasure.

但是,您可以实现自己的类来扩展 Serde 接口,并将您的自定义类传递到配置中:

However, you can implement you own class that extends Serde interface instead and pass your custom class into the config:

public class MySerde extends Serde<Integer> {
    // use KafkaAvroSerializer and KafkaAvroDeserializer and cast `Object` to `Integer`
}

config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MySerde.class);

这篇关于AVRO 原始类型的 Serde 类的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆