无法在 Kafka 的 Avro Consumer 端解码自定义对象 [英] Unable to decode Custom object at Avro Consumer end in Kafka

查看:36
本文介绍了无法在 Kafka 的 Avro Consumer 端解码自定义对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个具体的类,我在字节数组中对其进行序列化以发送到 Kafka 主题.对于序列化,我使用 ReflectDatumWriter .在发送字节 [] 之前,我在查看一些在线教程后将模式 ID 与模式 ID 放在前 4 个字节中.

I have a concrete class which I am Serializing in Byte array to be sent to a Kafka topic. For serializing I am using ReflectDatumWriter . Before sending the bytes[] I am putting schema ID in first 4 bytes with schema ID after checking some online tutorial.

我能够发送消息,但在 Avro 控制台消费者中使用它时,我得到的响应为:

I am able to send the message but while consuming it in Avro console consumer I am getting response as :

./bin/kafka-avro-console-consumer --bootstrap-server 0:9092 --propertyschema.stry.url=http://0:8081 --property print.key=true --topic 测试

./bin/kafka-avro-console-consumer --bootstrap-server 0:9092 --property schema.stry.url=http://0:8081 --property print.key=true --topic Test

"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000"
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000"

    MParams ddb = new MParams();
    ddb.setKey("ss");

    for (int i = 0; i < 10; i++) {
        ProducerRecord record = new ProducerRecord<String, byte[]>("Test", "1", build(1, Producer.serialize(ddb)));
        Future resp = kafkaFullAckProducer.send(record);

        System.out.println("Success" + resp.get());
    }
}

public static <T> byte[] serialize(T data) {
    Schema schema = null;
    if (data == null) {
        throw new RuntimeException("Data cannot be null in AvroByteSerializer");
    }
    try {
        schema = ReflectData.get().getSchema(data.getClass());
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
        writer.write(data, new EncoderFactory().directBinaryEncoder(out, null));
        byte[] bytes = out.toByteArray();
        return bytes;
    } catch (java.io.IOException e) {
        throw new RuntimeException("Error serializing Avro message", e);
    }
}

public static byte[] build(Integer schemaId, byte[] data) {
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    out.write(0);
    try {
        out.write(ByteBuffer.allocate(4).putInt(schemaId).array());
        out.write(data);
        byte[] bytes = out.toByteArray();
        out.close();
        return bytes;
    } catch (IOException e) {
        throw new RuntimeException("Exception in avro record builder , msg :" + e.getMessage());
    }



@Data
public class MParams extends MetricParams{

    // POJO version

    @Nullable
    private String key;


}

@JsonTypeInfo(use = Id.CLASS, include = As.PROPERTY, property = "@c")
@Union(value= {MParams.class})
public abstract class MetricParams {

}

工作序列化程序片段

public byte[] serialize(String topic, T record) {
        Schema schema;
        int id;
        try {
            schema = ReflectData.get().getSchema(record.getClass());
            id = client.register(topic + "-value", schema);
        } catch (IOException | RestClientException e) {
            throw new RuntimeException(e);
        }
        return serializeImpl(id, schema, record);
    }

    protected byte[] serializeImpl(int id, Schema schema, T object) throws SerializationException {
        if (object == null) {
            return null;
        }
        try {
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            out.write(0x0);
            out.write(ByteBuffer.allocate(4).putInt(id).array());

            BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
            DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
            writer.write(object, encoder);
            encoder.flush();
            byte[] bytes = out.toByteArray();
            out.close();
            return bytes;
        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error serializing Avro message", e);
        }
    }

解串器:

protected T deserialize(Schema schema, byte[] payload) throws SerializationException {
        // Even if the caller requests schema & version, if the payload is null
        // cannot include it. The caller must handle
        // this case.
        if (payload == null) {
            return null;
        }

        int id = -1;
        try {
            ByteBuffer buffer = getByteBuffer(payload);
            id = buffer.getInt();
            int length = buffer.limit() - 1 - 4;

            int start = buffer.position() + buffer.arrayOffset();
            DatumReader<T> reader = new ReflectDatumReader<T>(schema);
            T res = reader.read(null, new DecoderFactory().binaryDecoder(buffer.array(), start, length, null));
            return res;
        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error deserializing Avro message for id " + id, e);
        }
    }

    private ByteBuffer getByteBuffer(byte[] payload) {
        ByteBuffer buffer = ByteBuffer.wrap(payload);
        if (buffer.get() != 0x0) {
            throw new SerializationException("Unknown magic byte!");
        }
        return buffer;
    }

推荐答案

对于序列化,我使用 ReflectDatumWriter .在发送字节 [] 之前,我将模式 ID 与模式 ID 放在前 4 个字节中

For serializing I am using ReflectDatumWriter . Before sending the bytes[] I am putting schema ID in first 4 bytes with schema ID

不清楚您为什么要绕过KafkaAvroSerializer 类的默认行为.(在您的情况下,从该示例中删除 Schema.Parser,并使用您的 Reflect 记录类型而不是 GenericRecord)

Not clear why you are trying to bypass the KafkaAvroSerializer class's default behavior. (In your case, remove Schema.Parser from that example, and use your Reflect record type rather than GenericRecord)

你可以把你的具体类作为生产者的第二种类型,只要它实现了基本的 Avro 类,它就应该被正确序列化(意味着 ID 计算正确,而不是你创建的一些数字,并转换为字节),注册到注册中心,然后发送到Kafka

You can put your concrete class as the second type of the producer and as long as it implements the base Avro classes, it should be serialized correctly (meaning ID computed correctly, not some number you create, and converted to bytes), registered to the registry, then sent to Kafka

最重要的是,模式 ID 在注册表中不一定是 1,这样一来,控制台使用者可能会尝试错误地反序列化您的消息,从而导致错误的输出

Most importantly, the schema ID is not necessarily a 1 in the registry, and by putting that, the console consumer might be trying to deserialize your messages incorrectly, resulting in the wrong output

换句话说,尝试

ProducerRecord<String, MParams> record = new ProducerRecord<>(...)

这篇关于无法在 Kafka 的 Avro Consumer 端解码自定义对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆