KafkaAvroDeserializer不返回SpecificRecord但返回GenericRecord [英] KafkaAvroDeserializer does not return SpecificRecord but returns GenericRecord
问题描述
我的 KafkaProducer
能够使用 KafkaAvroSerializer
将对象序列化到我的主题。但是, KafkaConsumer.poll()
返回反序列化的 GenericRecord
而不是我的序列化类。
My KafkaProducer
is able to use KafkaAvroSerializer
to serialize objects to my topic. However, KafkaConsumer.poll()
returns deserialized GenericRecord
instead of my serialized class.
MyKafkaProducer
MyKafkaProducer
KafkaProducer<CharSequence, MyBean> producer;
try (InputStream props = Resources.getResource("producer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
properties.put("schema.registry.url", "http://localhost:8081");
MyBean bean = new MyBean();
producer = new KafkaProducer<>(properties);
producer.send(new ProducerRecord<>(topic, bean.getId(), bean));
我的KafkaConsumer
My KafkaConsumer
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
properties.load(props);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put("schema.registry.url", "http://localhost:8081");
consumer = new KafkaConsumer<>(properties);
}
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<CharSequence, MyBean> records = consumer.poll(100);
if (records.isEmpty()) {
continue;
}
for (ConsumerRecord<CharSequence, MyBean> record : records) {
MyBean bean = record.value(); // <-------- This is throwing a cast Exception because it cannot cast GenericRecord to MyBean
System.out.println("consumer received: " + bean);
}
}
MyBean bean = record。 value();
该行抛出一个强制转换异常,因为它无法将GenericRecord强制转换为MyBean。
MyBean bean = record.value();
That line throws a cast Exception because it cannot cast GenericRecord to MyBean.
我正在使用 kafka-client-0.9.0.1
, kafka-avro-serializer-3.0.0
。
推荐答案
KafkaAvroDeserializer支持SpecificData
默认情况下不启用它。启用它:
KafkaAvroDeserializer supports SpecificData
It's not enabled by default. To enable it:
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
KafkaAvroDeserializer不支持ReflectData
Confluent's KafkaAvroDeserializer
不知道如何使用Avro ReflectData反序列化。我不得不扩展它以支持Avro ReflectData:
KafkaAvroDeserializer does not support ReflectData
Confluent's KafkaAvroDeserializer
does not know how to deserialize using Avro ReflectData. I had to extend it to support Avro ReflectData:
/**
* Extends deserializer to support ReflectData.
*
* @param <V>
* value type
*/
public abstract class ReflectKafkaAvroDeserializer<V> extends KafkaAvroDeserializer {
private Schema readerSchema;
private DecoderFactory decoderFactory = DecoderFactory.get();
protected ReflectKafkaAvroDeserializer(Class<V> type) {
readerSchema = ReflectData.get().getSchema(type);
}
@Override
protected Object deserialize(
boolean includeSchemaAndVersion,
String topic,
Boolean isKey,
byte[] payload,
Schema readerSchemaIgnored) throws SerializationException {
if (payload == null) {
return null;
}
int schemaId = -1;
try {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != MAGIC_BYTE) {
throw new SerializationException("Unknown magic byte!");
}
schemaId = buffer.getInt();
Schema writerSchema = schemaRegistry.getByID(schemaId);
int start = buffer.position() + buffer.arrayOffset();
int length = buffer.limit() - 1 - idSize;
DatumReader<Object> reader = new ReflectDatumReader(writerSchema, readerSchema);
BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
return reader.read(null, decoder);
} catch (IOException e) {
throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
} catch (RestClientException e) {
throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
}
}
}
定义一个自定义反序列化器类反序列化为 MyBean
:
Define a custom deserializer class which deserializes to MyBean
:
public class MyBeanDeserializer extends ReflectKafkaAvroDeserializer<MyBean> {
public MyBeanDeserializer() {
super(MyBean.class);
}
}
配置 KafkaConsumer
使用自定义反序列化器类:
Configure KafkaConsumer
to use the custom deserializer class:
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyBeanDeserializer.class);
这篇关于KafkaAvroDeserializer不返回SpecificRecord但返回GenericRecord的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!