在没有融合的架构注册表的情况下进行反序列化:Avro序列化数据不包含avro架构 [英] Deserialization without confluent schema registry: Avro serialized data doesn't contain avro schema

查看:87
本文介绍了在没有融合的架构注册表的情况下进行反序列化:Avro序列化数据不包含avro架构的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在尝试对avro通用记录进行序列化,并生成avro序列化数据以发送给kafka.主要目标是不使用融合的架构注册表存储架构,而是将架构与序列化数据一起发送,以便可以从kafka主题中提取并反序列化.

I have been trying to trying to serilalize avro generic record and generate avro serialized data to send to kafka. The major goal is to not use confluent schema registry for storing schema but sending the schema along with the serialized data so it can be extracted from kafka topic and deserialized.

以下是AvroSerializer用于生成Avro数据的部分.

The below is the part of AvroSerializer for generating Avro data.


  @Override
  public byte[] serialize(String topic, T data) {
    try {
      byte[] result = null;
      if (data != null) {
        LOGGER.debug("data='{}'", data);


        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder =
            EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
        datumWriter.setSchema(data.getSchema());
        datumWriter.write(data, binaryEncoder);

        binaryEncoder.flush();
        byteArrayOutputStream.close(); 
        result = byteArrayOutputStream.toByteArray();


      }

      return result;
    } catch (IOException ex) {
      throw new SerializationException(
          "Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
    }
  }

kafka中存在的序列化数据如下所示.

The serialized data present in kafka looks like this.

AvroDeserializer部分如下所示.

The AvroDeserializer part looks like this.

  @Override
  public T deserialize(String topic, byte[] data) {

    GenericRecord person = null;

    try {
      T result = null;

      if (data != null) {
        LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));

        Schema schema = Schema.parse(schemaString);
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);

 
        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

        result = (T) datumReader.read(null, decoder);
        LOGGER.debug(result.getSchema().toString());
        LOGGER.debug("deserialized data='{}'", result);
      }

      return result;

    } catch (Exception ex) {
      throw new SerializationException(
          "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
    }
  }


生产者如下所示

public class KafkaAvroProducerUtil {


    public  Future<RecordMetadata> produceTokafka(GenericRecord object) throws IOException {


        Properties properties = new Properties();
        // normal producer
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("acks", "all");
        properties.setProperty("retries", "10");
        // avro part

        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", AvroSerializer.class.getName());
 


        String topic = "avro";

        Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties);
        ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<String, GenericRecord>(
                topic, object
        );

        Future<RecordMetadata> data = producer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println(metadata);
                } else {
                    exception.printStackTrace();
                }
            }
        });

        producer.flush();
        producer.close();


        return data;
    }

当我尝试反序列化时,它说需要模式.我所理解的问题是,如您在上图中看到的数据(在cmd上运行的消费者快照)中的架构未随其一起发送.如何将模式与数据一起发送,以便可以对与数据一起发送的模式进行反序列化.

When I try to deserialize this it says schema is needed. The problem what I understand is that as you see the data in image above(snapshot of consumer running on cmd) the schema is not send along with it. How can I send schema along with the data so that I can deserialize with the schema send along with the data.

推荐答案

根据@OneCricketeer和@ChinHuang的建议,我已经通过两种方式来回答问题.

EDITS: I have approached the answers in two ways as per the suggestions of @OneCricketeer and @ChinHuang.

下面将对这两种方法进行说明.但是,标头方法的答案如下所示.

Both the approaches are explained below. But the answer for the header approach is shown below.

方法1:将模式与数据一起发送

APPROACH 1: Sending schema along with data

在这种方法中,我将Avro模式序列化为字符串并与定界符一起发​​送给kafka主题,并在其中添加数据.

In this approach I seraialized the Avro schema as string and along with a delimiter and send them to kafka topic adding the data along with it.

从kafka主题读取数据后反序列化时,使用定界符将字节数组拆分为模式和数据.然后,我将模式字节转换回模式,然后使用该模式对数据进行反序列化.

While deserializing after reading the data from kafka topic split the byte array as schema and data using the delimiter. Then I would convert schema bytes back to schema and then use that schema to deserialize the data.

方法的缺点:如@OneCricketeer所说

Cons of the apporach: As @OneCricketeer said

  1. 肯定是不合格的
  2. 如果定界符出现在模式中,则整个方法将失效

APPROACH 2:在标头中发送模式

APPROACH 2: Sending schema in the header

这里不是将模式与数据一起发送,而是在头中发送模式.

Here rather than sending schema along with the data , the schema is send in the header.

Serializer类中的方法如下所示.

the methods in the Serializer class are shown below.


  @Override
  public byte[] serialize(String topic, T data) {


   return null;
 
}

  public  byte[] serialize(String topic, Headers headers, T data) {


    try {

      byte[] result = null;
      byte[] payload = null;
      if (data != null) {
        LOGGER.debug("data='{}'", data);


        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder =
                EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);

        byte[] schemaBytes = data.getSchema().toString().getBytes();

        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
        datumWriter.setSchema(data.getSchema());
        datumWriter.write(data, binaryEncoder);

        binaryEncoder.flush();
        byteArrayOutputStream.close();


        result = byteArrayOutputStream.toByteArray();

        ByteArrayOutputStream outputStream2 = new ByteArrayOutputStream( );
        outputStream2.write( result );
        payload =  outputStream2.toByteArray( );

        headers.add("schema",schemaBytes);

      }

      LOGGER.info("headers added");
      return payload;
    } catch (IOException ex) {
      throw new SerializationException(
              "Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
    }
  }

解串器方法如下所示.



  @Override
  public T deserialize(String topic, byte[] data) {

      return  null


   }
  public T deserialize(String topic, Headers headers, byte[] data) {


    try {
      T result = null;

      if (data != null) {
        LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));
        Header header = headers.lastHeader("schema");

        String schemaString2 = new String(header.value());
  
        Schema schema = Schema.parse(schemaString2);
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
        DataFileReader<GenericRecord> dataFileReader = null;

        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

        result = (T) datumReader.read(null, decoder);

        LOGGER.debug(result.getSchema().toString());
        LOGGER.debug("deserialized data='{}'", result);

      }

      return (T) result;

    } catch (Exception ex) {
      throw new SerializationException(
              "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
    }
  }

这篇关于在没有融合的架构注册表的情况下进行反序列化:Avro序列化数据不包含avro架构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆