无法在Kafka的Avro Consumer端解码自定义对象 [英] Unable to decode Custom object at Avro Consumer end in Kafka

查看:155
本文介绍了无法在Kafka的Avro Consumer端解码自定义对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个具体的课程,正在按字节数组进行序列化,以发送给Kafka主题. 对于序列化,我使用ReflectDatumWriter. 在发送字节[]之前,我在检查了一些在线教程之后,将模式ID放在具有模式ID的前四个字节中.

I have a concrete class which I am Serializing in Byte array to be sent to a Kafka topic. For serializing I am using ReflectDatumWriter . Before sending the bytes[] I am putting schema ID in first 4 bytes with schema ID after checking some online tutorial.

我能够发送消息,但是在Avro控制台使用者中使用它时,得到的响应是:

I am able to send the message but while consuming it in Avro console consumer I am getting response as :

./bin/kafka-avro-console-consumer --bootstrap-server 0:9092 --property schema.stry.url = http://0:8081 --property print.key = true --topic测试

./bin/kafka-avro-console-consumer --bootstrap-server 0:9092 --property schema.stry.url=http://0:8081 --property print.key=true --topic Test

"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000"
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000"

    MParams ddb = new MParams();
    ddb.setKey("ss");

    for (int i = 0; i < 10; i++) {
        ProducerRecord record = new ProducerRecord<String, byte[]>("Test", "1", build(1, Producer.serialize(ddb)));
        Future resp = kafkaFullAckProducer.send(record);

        System.out.println("Success" + resp.get());
    }
}

public static <T> byte[] serialize(T data) {
    Schema schema = null;
    if (data == null) {
        throw new RuntimeException("Data cannot be null in AvroByteSerializer");
    }
    try {
        schema = ReflectData.get().getSchema(data.getClass());
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
        writer.write(data, new EncoderFactory().directBinaryEncoder(out, null));
        byte[] bytes = out.toByteArray();
        return bytes;
    } catch (java.io.IOException e) {
        throw new RuntimeException("Error serializing Avro message", e);
    }
}

public static byte[] build(Integer schemaId, byte[] data) {
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    out.write(0);
    try {
        out.write(ByteBuffer.allocate(4).putInt(schemaId).array());
        out.write(data);
        byte[] bytes = out.toByteArray();
        out.close();
        return bytes;
    } catch (IOException e) {
        throw new RuntimeException("Exception in avro record builder , msg :" + e.getMessage());
    }



@Data
public class MParams extends MetricParams{

    // POJO version

    @Nullable
    private String key;


}

@JsonTypeInfo(use = Id.CLASS, include = As.PROPERTY, property = "@c")
@Union(value= {MParams.class})
public abstract class MetricParams {

}

有效的序列化器代码段

public byte[] serialize(String topic, T record) {
        Schema schema;
        int id;
        try {
            schema = ReflectData.get().getSchema(record.getClass());
            id = client.register(topic + "-value", schema);
        } catch (IOException | RestClientException e) {
            throw new RuntimeException(e);
        }
        return serializeImpl(id, schema, record);
    }

    protected byte[] serializeImpl(int id, Schema schema, T object) throws SerializationException {
        if (object == null) {
            return null;
        }
        try {
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            out.write(0x0);
            out.write(ByteBuffer.allocate(4).putInt(id).array());

            BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
            DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
            writer.write(object, encoder);
            encoder.flush();
            byte[] bytes = out.toByteArray();
            out.close();
            return bytes;
        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error serializing Avro message", e);
        }
    }

反序列化器:

protected T deserialize(Schema schema, byte[] payload) throws SerializationException {
        // Even if the caller requests schema & version, if the payload is null
        // cannot include it. The caller must handle
        // this case.
        if (payload == null) {
            return null;
        }

        int id = -1;
        try {
            ByteBuffer buffer = getByteBuffer(payload);
            id = buffer.getInt();
            int length = buffer.limit() - 1 - 4;

            int start = buffer.position() + buffer.arrayOffset();
            DatumReader<T> reader = new ReflectDatumReader<T>(schema);
            T res = reader.read(null, new DecoderFactory().binaryDecoder(buffer.array(), start, length, null));
            return res;
        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error deserializing Avro message for id " + id, e);
        }
    }

    private ByteBuffer getByteBuffer(byte[] payload) {
        ByteBuffer buffer = ByteBuffer.wrap(payload);
        if (buffer.get() != 0x0) {
            throw new SerializationException("Unknown magic byte!");
        }
        return buffer;
    }

推荐答案

对于序列化,我使用ReflectDatumWriter.在发送字节[]之前,我先将模式ID放在具有模式ID的前4个字节中

For serializing I am using ReflectDatumWriter . Before sending the bytes[] I am putting schema ID in first 4 bytes with schema ID

不清楚为什么要尝试绕过KafkaAvroSerializer类的默认行为. (在您的情况下,请从该示例中删除Schema.Parser,并使用您的Reflect记录类型,而不是GenericRecord)

Not clear why you are trying to bypass the KafkaAvroSerializer class's default behavior. (In your case, remove Schema.Parser from that example, and use your Reflect record type rather than GenericRecord)

您可以将具体的类作为生产者的第二种类型,并且只要它实现基本的Avro类,就应该正确地对其进行序列化(这意味着ID正确计算,而不是您创建的某个数字,并且转换为字节),注册到注册表,然后发送到Kafka

You can put your concrete class as the second type of the producer and as long as it implements the base Avro classes, it should be serialized correctly (meaning ID computed correctly, not some number you create, and converted to bytes), registered to the registry, then sent to Kafka

最重要的是,架构ID不一定在注册表中为1,这样,控制台使用者可能会尝试不正确地反序列化消息,从而导致输出错误

Most importantly, the schema ID is not necessarily a 1 in the registry, and by putting that, the console consumer might be trying to deserialize your messages incorrectly, resulting in the wrong output

换句话说,尝试

ProducerRecord<String, MParams> record = new ProducerRecord<>(...)

这篇关于无法在Kafka的Avro Consumer端解码自定义对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆