Apache Flink 从 Kafka 读取 Avro byte[] [英] Apache Flink read Avro byte[] from Kafka

查看:31
本文介绍了Apache Flink 从 Kafka 读取 Avro byte[]的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在回顾示例时,我看到了很多这样的内容:

In reviewing examples I see alot of this:

FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);

我看到他们在这里已经知道架构.

I see that they here already know the schema.

在将 byte[] 读入通用记录之前,我不知道架构然后获取架构.(因为它可能会因记录而异)

I do not know the schema until I read the byte[] into a Generic Record then get the schema. (As it may change from record to record)

有人可以将我指向从 byte[] 读取到映射过滤器的 FlinkKafkaConsumer08,以便我可以删除一些前导位,然后加载该 byte[] 变成通用记录?

Can someone point me into a FlinkKafkaConsumer08 that reads from byte[] into a map filter so that I can remove some leading bits, then load that byte[] into a Generic Record ?

推荐答案

我正在做类似的事情(我正在使用 09 消费者)

I'm doing something similar (I'm using the 09 consumer)

在自定义反序列化器的主要代码中:

In your main code pass in your custom deserializer:

FlinkKafkaConsumer09<Object> kafkaConsumer = new FlinkKafkaConsumer09<>(
                parameterTool.getRequired("topic"), new MyDeserializationSchema<>(),
                parameterTool.getProperties());

自定义反序列化架构读取字节、计算架构和/或从架构注册表中检索它,反序列化为 GenericRecord 并返回 GenericRecord 对象.

The custom Deserialization Schema reads the bytes, figures out the schema and/or retrieves it from a schema registry, deserializes into a GenericRecord and returns the GenericRecord object.

public class MyDeserializationSchema<T> implements DeserializationSchema<T> {


    private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class;

    @Override
    public T deserialize(byte[] arg0) throws IOException {
        //do your stuff here, strip off your bytes
        //deserialize and create your GenericRecord 
        return (T) (myavroevent);
    }

    @Override
    public boolean isEndOfStream(T nextElement) {
        return false;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(avrotype);
    }

}

这篇关于Apache Flink 从 Kafka 读取 Avro byte[]的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆