无法在Kafka的Avro Consumer端解码自定义对象 [英] Unable to decode Custom object at Avro Consumer end in Kafka
问题描述
我有一个具体的课程,正在按字节数组进行序列化,以发送给Kafka主题. 对于序列化,我使用ReflectDatumWriter. 在发送字节[]之前,我在检查了一些在线教程之后,将模式ID放在具有模式ID的前四个字节中.
I have a concrete class which I am Serializing in Byte array to be sent to a Kafka topic. For serializing I am using ReflectDatumWriter . Before sending the bytes[] I am putting schema ID in first 4 bytes with schema ID after checking some online tutorial.
我能够发送消息,但是在Avro控制台使用者中使用它时,得到的响应是:
I am able to send the message but while consuming it in Avro console consumer I am getting response as :
./bin/kafka-avro-console-consumer --bootstrap-server 0:9092 --property schema.stry.url = http://0:8081 --property print.key = true --topic测试
./bin/kafka-avro-console-consumer --bootstrap-server 0:9092 --property schema.stry.url=http://0:8081 --property print.key=true --topic Test
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
MParams ddb = new MParams();
ddb.setKey("ss");
for (int i = 0; i < 10; i++) {
ProducerRecord record = new ProducerRecord<String, byte[]>("Test", "1", build(1, Producer.serialize(ddb)));
Future resp = kafkaFullAckProducer.send(record);
System.out.println("Success" + resp.get());
}
}
public static <T> byte[] serialize(T data) {
Schema schema = null;
if (data == null) {
throw new RuntimeException("Data cannot be null in AvroByteSerializer");
}
try {
schema = ReflectData.get().getSchema(data.getClass());
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
writer.write(data, new EncoderFactory().directBinaryEncoder(out, null));
byte[] bytes = out.toByteArray();
return bytes;
} catch (java.io.IOException e) {
throw new RuntimeException("Error serializing Avro message", e);
}
}
public static byte[] build(Integer schemaId, byte[] data) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(0);
try {
out.write(ByteBuffer.allocate(4).putInt(schemaId).array());
out.write(data);
byte[] bytes = out.toByteArray();
out.close();
return bytes;
} catch (IOException e) {
throw new RuntimeException("Exception in avro record builder , msg :" + e.getMessage());
}
@Data
public class MParams extends MetricParams{
// POJO version
@Nullable
private String key;
}
@JsonTypeInfo(use = Id.CLASS, include = As.PROPERTY, property = "@c")
@Union(value= {MParams.class})
public abstract class MetricParams {
}
有效的序列化器代码段
public byte[] serialize(String topic, T record) {
Schema schema;
int id;
try {
schema = ReflectData.get().getSchema(record.getClass());
id = client.register(topic + "-value", schema);
} catch (IOException | RestClientException e) {
throw new RuntimeException(e);
}
return serializeImpl(id, schema, record);
}
protected byte[] serializeImpl(int id, Schema schema, T object) throws SerializationException {
if (object == null) {
return null;
}
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(0x0);
out.write(ByteBuffer.allocate(4).putInt(id).array());
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
writer.write(object, encoder);
encoder.flush();
byte[] bytes = out.toByteArray();
out.close();
return bytes;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error serializing Avro message", e);
}
}
反序列化器:
protected T deserialize(Schema schema, byte[] payload) throws SerializationException {
// Even if the caller requests schema & version, if the payload is null
// cannot include it. The caller must handle
// this case.
if (payload == null) {
return null;
}
int id = -1;
try {
ByteBuffer buffer = getByteBuffer(payload);
id = buffer.getInt();
int length = buffer.limit() - 1 - 4;
int start = buffer.position() + buffer.arrayOffset();
DatumReader<T> reader = new ReflectDatumReader<T>(schema);
T res = reader.read(null, new DecoderFactory().binaryDecoder(buffer.array(), start, length, null));
return res;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error deserializing Avro message for id " + id, e);
}
}
private ByteBuffer getByteBuffer(byte[] payload) {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != 0x0) {
throw new SerializationException("Unknown magic byte!");
}
return buffer;
}
推荐答案
对于序列化,我使用ReflectDatumWriter.在发送字节[]之前,我先将模式ID放在具有模式ID的前4个字节中
For serializing I am using ReflectDatumWriter . Before sending the bytes[] I am putting schema ID in first 4 bytes with schema ID
不清楚为什么要尝试绕过KafkaAvroSerializer
类的默认行为. (在您的情况下,请从该示例中删除Schema.Parser
,并使用您的Reflect记录类型,而不是GenericRecord
)
Not clear why you are trying to bypass the KafkaAvroSerializer
class's default behavior. (In your case, remove Schema.Parser
from that example, and use your Reflect record type rather than GenericRecord
)
您可以将具体的类作为生产者的第二种类型,并且只要它实现基本的Avro类,就应该正确地对其进行序列化(这意味着ID正确计算,而不是您创建的某个数字,并且转换为字节),注册到注册表,然后发送到Kafka
You can put your concrete class as the second type of the producer and as long as it implements the base Avro classes, it should be serialized correctly (meaning ID computed correctly, not some number you create, and converted to bytes), registered to the registry, then sent to Kafka
最重要的是,架构ID不一定在注册表中为1,这样,控制台使用者可能会尝试不正确地反序列化消息,从而导致输出错误
Most importantly, the schema ID is not necessarily a 1 in the registry, and by putting that, the console consumer might be trying to deserialize your messages incorrectly, resulting in the wrong output
换句话说,尝试
ProducerRecord<String, MParams> record = new ProducerRecord<>(...)
这篇关于无法在Kafka的Avro Consumer端解码自定义对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!