没有融合架构注册表的反序列化:Avro 序列化数据不包含 avro 架构 [英] Deserialization without confluent schema registry: Avro serialized data doesn't contain avro schema

查看:29
本文介绍了没有融合架构注册表的反序列化:Avro 序列化数据不包含 avro 架构的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在尝试序列化 avro 通用记录并生成 avro 序列化数据以发送到 kafka.主要目标是不使用融合模式注册表来存储模式,而是将模式与序列化数据一起发送,以便可以从 kafka 主题中提取并反序列化.

以下是 AvroSerializer 用于生成 Avro 数据的部分.

<预><代码>@覆盖公共字节[] 序列化(字符串主题,T 数据){尝试 {字节 [] 结果 = 空;如果(数据!= null){LOGGER.debug("data='{}'", data);ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();BinaryEncoder binaryEncoder =EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);DatumWriterdatumWriter = new GenericDatumWriter<>(data.getSchema());datumWriter.setSchema(data.getSchema());datumWriter.write(data, binaryEncoder);binaryEncoder.flush();byteArrayOutputStream.close();结果 = byteArrayOutputStream.toByteArray();}返回结果;} catch (IOException ex) {抛出新的序列化异常(无法序列化数据='"+ 数据 + "'主题="+ 主题 + "'", ex);}}

kafka 中的序列化数据如下所示.

AvroDeserializer 部分如下所示.

 @Override公共 T 反序列化(字符串主题,字节 [] 数据){GenericRecord 人 = null;尝试 {T 结果 = 空;如果(数据!= null){LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));模式模式 = Schema.parse(schemaString);DatumReaderdatumReader = new GenericDatumReader(schema);解码器解码器 = DecoderFactory.get().binaryDecoder(data, null);结果 = (T) datumReader.read(null,decoder);LOGGER.debug(result.getSchema().toString());LOGGER.debug("反序列化数据='{}'", result);}返回结果;} 捕捉(异常前){抛出新的序列化异常(无法反序列化数据"+ Arrays.toString(data) + "'来自主题"+ 主题 + "'", ex);}}

制作人如下图

公共类KafkaAvroProducerUtil {公共未来<RecordMetadata>生产托卡夫卡(GenericRecord 对象)抛出 IOException {属性 properties = new Properties();//普通生产者properties.setProperty("bootstrap.servers", "127.0.0.1:9092");properties.setProperty("acks", "all");properties.setProperty(重试",10");//avro 部分properties.setProperty(key.serializer", StringSerializer.class.getName());properties.setProperty(value.serializer", AvroSerializer.class.getName());String topic = "avro";生产者生产者 = 新的 KafkaProducer(属性);ProducerRecordproducerRecord = new ProducerRecord(话题、对象);未来<RecordMetadata>数据 = producer.send(producerRecord, new Callback() {@覆盖public void onCompletion(RecordMetadata 元数据,异常异常){如果(异常 == 空){System.out.println(元数据);} 别的 {异常.printStackTrace();}}});生产者.flush();生产者.close();返回数据;}

当我尝试反序列化它时,它说需要模式.我所理解的问题是,当您看到上图中的数据(在 cmd 上运行的消费者的快照)时,架构不会随之发送.如何将架构与数据一起发送,以便我可以对与数据一起发送的架构进行反序列化.

解决方案

根据@OneCricketeer 和@ChinHuang 的建议,我以两种方式接近了答案.

下面解释了这两种方法.但是标题方法的答案如下所示.

方法 1:随数据一起发送模式

在这种方法中,我将 Avro 模式序列化为字符串和分隔符,然后将它们发送到 kafka 主题,同时添加数据.

在从 kafka 主题读取数据后进行反序列化时,使用分隔符将字节数组拆分为架构和数据.然后我会将架构字节转换回架构,然后使用该架构反序列化数据.

该方法的缺点:正如@OneCricketeer 所说

  1. 它绝对没有性能
  2. 如果分隔符出现在架构中,整个方法就会中断

方法 2:在标头中发送架构

这里不是随数据一起发送模式,而是在标题中发送模式.

Serializer 类中的方法如下所示.

<预><代码>@覆盖公共字节[] 序列化(字符串主题,T 数据){返回空;}公共字节 [] 序列化(字符串主题,标题标题,T 数据){尝试 {字节 [] 结果 = 空;字节 [] 有效载荷 = 空;如果(数据!= null){LOGGER.debug("data='{}'", data);ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();BinaryEncoder binaryEncoder =EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);byte[] schemaBytes = data.getSchema().toString().getBytes();DatumWriterdatumWriter = new GenericDatumWriter<>(data.getSchema());datumWriter.setSchema(data.getSchema());datumWriter.write(data, binaryEncoder);binaryEncoder.flush();byteArrayOutputStream.close();结果 = byteArrayOutputStream.toByteArray();ByteArrayOutputStream outputStream2 = new ByteArrayOutputStream();outputStream2.write( 结果 );有效载荷 = outputStream2.toByteArray();headers.add("schema",schemaBytes);}LOGGER.info(添加了标题");返回有效载荷;} catch (IOException ex) {抛出新的序列化异常(无法序列化数据='"+ 数据 + "'主题="+ 主题 + "'", ex);}}

Deserializer 方法如下所示.

<预><代码>@覆盖公共 T 反序列化(字符串主题,字节 [] 数据){返回空值}公共 T 反序列化(字符串主题,标题标题,字节 [] 数据){尝试 {T 结果 = 空;如果(数据!= null){LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));Header header = headers.lastHeader("schema");String schemaString2 = new String(header.value());模式模式 = Schema.parse(schemaString2);DatumReaderdatumReader = new GenericDatumReader(schema);DataFileReaderdataFileReader = null;解码器解码器 = DecoderFactory.get().binaryDecoder(data, null);结果 = (T) datumReader.read(null,decoder);LOGGER.debug(result.getSchema().toString());LOGGER.debug("反序列化数据='{}'", result);}返回 (T) 结果;} 捕捉(异常前){抛出新的序列化异常(无法反序列化数据"+ Arrays.toString(data) + "'来自主题"+ 主题 + "'", ex);}}

I have been trying to trying to serilalize avro generic record and generate avro serialized data to send to kafka. The major goal is to not use confluent schema registry for storing schema but sending the schema along with the serialized data so it can be extracted from kafka topic and deserialized.

The below is the part of AvroSerializer for generating Avro data.


  @Override
  public byte[] serialize(String topic, T data) {
    try {
      byte[] result = null;
      if (data != null) {
        LOGGER.debug("data='{}'", data);


        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder =
            EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
        datumWriter.setSchema(data.getSchema());
        datumWriter.write(data, binaryEncoder);

        binaryEncoder.flush();
        byteArrayOutputStream.close(); 
        result = byteArrayOutputStream.toByteArray();


      }

      return result;
    } catch (IOException ex) {
      throw new SerializationException(
          "Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
    }
  }

The serialized data present in kafka looks like this.

The AvroDeserializer part looks like this.

  @Override
  public T deserialize(String topic, byte[] data) {

    GenericRecord person = null;

    try {
      T result = null;

      if (data != null) {
        LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));

        Schema schema = Schema.parse(schemaString);
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);

 
        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

        result = (T) datumReader.read(null, decoder);
        LOGGER.debug(result.getSchema().toString());
        LOGGER.debug("deserialized data='{}'", result);
      }

      return result;

    } catch (Exception ex) {
      throw new SerializationException(
          "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
    }
  }


The producer is shown below

public class KafkaAvroProducerUtil {


    public  Future<RecordMetadata> produceTokafka(GenericRecord object) throws IOException {


        Properties properties = new Properties();
        // normal producer
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("acks", "all");
        properties.setProperty("retries", "10");
        // avro part

        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", AvroSerializer.class.getName());
 


        String topic = "avro";

        Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties);
        ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<String, GenericRecord>(
                topic, object
        );

        Future<RecordMetadata> data = producer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println(metadata);
                } else {
                    exception.printStackTrace();
                }
            }
        });

        producer.flush();
        producer.close();


        return data;
    }

When I try to deserialize this it says schema is needed. The problem what I understand is that as you see the data in image above(snapshot of consumer running on cmd) the schema is not send along with it. How can I send schema along with the data so that I can deserialize with the schema send along with the data.

解决方案

EDITS: I have approached the answers in two ways as per the suggestions of @OneCricketeer and @ChinHuang.

Both the approaches are explained below. But the answer for the header approach is shown below.

APPROACH 1: Sending schema along with data

In this approach I seraialized the Avro schema as string and along with a delimiter and send them to kafka topic adding the data along with it.

While deserializing after reading the data from kafka topic split the byte array as schema and data using the delimiter. Then I would convert schema bytes back to schema and then use that schema to deserialize the data.

Cons of the apporach: As @OneCricketeer said

  1. It is definitely non performant
  2. The whole approach would break if the delimitter comes in the schema

APPROACH 2: Sending schema in the header

Here rather than sending schema along with the data , the schema is send in the header.

the methods in the Serializer class are shown below.


  @Override
  public byte[] serialize(String topic, T data) {


   return null;
 
}

  public  byte[] serialize(String topic, Headers headers, T data) {


    try {

      byte[] result = null;
      byte[] payload = null;
      if (data != null) {
        LOGGER.debug("data='{}'", data);


        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder =
                EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);

        byte[] schemaBytes = data.getSchema().toString().getBytes();

        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
        datumWriter.setSchema(data.getSchema());
        datumWriter.write(data, binaryEncoder);

        binaryEncoder.flush();
        byteArrayOutputStream.close();


        result = byteArrayOutputStream.toByteArray();

        ByteArrayOutputStream outputStream2 = new ByteArrayOutputStream( );
        outputStream2.write( result );
        payload =  outputStream2.toByteArray( );

        headers.add("schema",schemaBytes);

      }

      LOGGER.info("headers added");
      return payload;
    } catch (IOException ex) {
      throw new SerializationException(
              "Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
    }
  }

the Deserializer methods are shown below.



  @Override
  public T deserialize(String topic, byte[] data) {

      return  null


   }
  public T deserialize(String topic, Headers headers, byte[] data) {


    try {
      T result = null;

      if (data != null) {
        LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));
        Header header = headers.lastHeader("schema");

        String schemaString2 = new String(header.value());
  
        Schema schema = Schema.parse(schemaString2);
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
        DataFileReader<GenericRecord> dataFileReader = null;

        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

        result = (T) datumReader.read(null, decoder);

        LOGGER.debug(result.getSchema().toString());
        LOGGER.debug("deserialized data='{}'", result);

      }

      return (T) result;

    } catch (Exception ex) {
      throw new SerializationException(
              "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
    }
  }

这篇关于没有融合架构注册表的反序列化:Avro 序列化数据不包含 avro 架构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆