KafkaAvroSerializer 用于在没有 schema.registry.url 的情况下序列化 Avro [英] KafkaAvroSerializer for serializing Avro without schema.registry.url

查看:29
本文介绍了KafkaAvroSerializer 用于在没有 schema.registry.url 的情况下序列化 Avro的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Kafka 和 Avro 的菜鸟.所以我一直试图让生产者/消费者运行.到目前为止,我已经能够使用以下方法生成和使用简单的字节和字符串:生产者的配置:

I'm a noob to Kafka and Avro. So i have been trying to get the Producer/Consumer running. So far i have been able to produce and consume simple Bytes and Strings, using the following : Configuration for the Producer :

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(USER_SCHEMA);
    Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 1000; i++) {
        GenericData.Record avroRecord = new GenericData.Record(schema);
        avroRecord.put("str1", "Str 1-" + i);
        avroRecord.put("str2", "Str 2-" + i);
        avroRecord.put("int1", i);

        byte[] bytes = recordInjection.apply(avroRecord);

        ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
        producer.send(record);
        Thread.sleep(250);
    }
    producer.close();
}

现在这一切都很好,当我尝试序列化 POJO 时出现问题.因此,我能够使用 Avro 提供的实用程序从 POJO 获取 AvroSchema.硬编码模式,然后尝试创建一个通用记录以通过 KafkaProducer 发送生产者现在设置为:

Now this is all well and good, the problem comes when i'm trying to serialize a POJO. So , i was able to get the AvroSchema from the POJO using the utility provided with Avro. Hardcoded the schema, and then tried to create a Generic Record to send through the KafkaProducer the producer is now set up as :

    Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.KafkaAvroSerializer");

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA); // this is the Generated AvroSchema
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

这就是问题所在:当我使用 KafkaAvroSerializer 时,生产者没有出现,因为:缺少必需参数:schema.registry.url

this is where the problem is : the moment i use KafkaAvroSerializer, the producer doesn't come up due to : missing mandatory parameter : schema.registry.url

我阅读了为什么需要这样做,以便我的消费者能够破译生产者发送给我的任何内容.但是模式不是已经嵌入到 AvroMessage 中了吗?如果有人可以分享一个使用 KafkaProducer 和 KafkaAvroSerializer 而无需指定 schema.registry.url 的工作示例,那就太好了

I read up on why this is required, so that my consumer is able to decipher whatever the producer is sending to me. But isn't the schema already embedded in the AvroMessage? Would be really great if someone can share a working example of using KafkaProducer with the KafkaAvroSerializer without having to specify schema.registry.url

也非常感谢有关架构注册表效用的任何见解/资源.

would also really appreciate any insights/resources on the utility of the schema registry.

谢谢!

推荐答案

注意:KafkaAvroSerializer 在 vanilla apache kafka 中不提供 - 它由 Confluent Platform 提供.(https://www.confluent.io/),作为其开源组件的一部分(http://docs.confluent.io/current/platform.html#confluent-schema-注册表)

Note first: KafkaAvroSerializer is not provided in vanilla apache kafka - it is provided by Confluent Platform. (https://www.confluent.io/), as part of its open source components (http://docs.confluent.io/current/platform.html#confluent-schema-registry)

快速回答:不,如果您使用 KafkaAvroSerializer,您将需要一个架构注册表.在此处查看一些示例:http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html

Rapid answer: no, if you use KafkaAvroSerializer, you will need a schema registry. See some samples here: http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html

schema registry 的基本思想是每个主题都将引用一个 avro schema(即,您只能发送彼此一致的数据.但是一个 schema 可以有多个版本,因此您仍然需要识别每个记录的模式)

The basic idea with schema registry is that each topic will refer to an avro schema (ie, you will only be able to send data coherent with each other. But a schema can have multiple version, so you still need to identify the schema for each record)

我们不想像您暗示的那样为每个数据编写架构 - 通常,架构比您的数据大!每次读取都浪费时间解析,浪费资源(网络,磁盘,cpu)

We don't want to write the schema for everydata like you imply - often, schema is bigger than your data! That would be a waste of time parsing it everytime when reading, and a waste of ressources (network, disk, cpu)

相反,模式注册表实例将绑定 avro 模式 <->int schemaId 然后序列化程序将在数据之前只写这个 id,从注册表中获取它(并缓存它以备后用).

Instead, a schema registry instance will do a binding avro schema <-> int schemaId and the serializer will then write only this id before the data, after getting it from registry (and caching it for later use).

因此在 kafka 中,您的记录将是 [;<bytesavro>](以及出于技术原因的魔法字节),它的开销仅为 5 个字节(与您的架构大小相比)阅读时,您的消费者将找到与 id 对应的模式,以及与之相关的反序列化器 avro 字节.您可以在 confluent doc 中找到更多方法

So inside kafka, your record will be [<id> <bytesavro>] (and magic byte for technical reason), which is an overhead of only 5 bytes (to compare to the size of your schema) And when reading, your consumer will find the corresponding schema to the id, and deserializer avro bytes regarding it. You can find way more in confluent doc

如果您真的想为每条记录编写架构,则需要另一个序列化程序(我认为编写自己的,但这很容易,只需重用 https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java 并删除架构注册表部分以将其替换为架构,相同读).但是如果你使用 avro,我真的不鼓励这样做 - 一天后,你将需要实现类似 avro 注册表的东西来管理版本控制

If you really have a use where you want to write the schema for every record, you will need an other serializer (I think writing your own, but it will be easy, just reuse https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java and remove the schema registry part to replace it with the schema, same for reading). But if you use avro, I would really discourage this - one day a later, you will need to implement something like avro registry to manage versioning

这篇关于KafkaAvroSerializer 用于在没有 schema.registry.url 的情况下序列化 Avro的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆