如何使用 Avro 和 Flink 解码 Kafka 消息 [英] How to decode Kafka messages using Avro and Flink

查看:39
本文介绍了如何使用 Avro 和 Flink 解码 Kafka 消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Flink 1.0.3 从 Kafka 主题中读取 AVRO 数据.

我只知道这个特定的 Kafka 主题有 AVRO 编码的消息,而我有 AVRO 架构文件.

<块引用>

我的 Flink 代码:

public static void main(String[] args) 抛出异常 {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();属性 properties = new Properties();properties.setProperty("bootstrap.servers", "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");properties.setProperty("zookeeper.connect", "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");properties.setProperty("group.id", "Zeeshantest");AvroDeserializationSchema<事件>avroSchema = 新的 AvroDeserializationSchema<>(Event.class);FlinkKafkaConsumer08<事件>kafkaConsumer = new FlinkKafkaConsumer08("myavrotopic", avroSchema, properties);数据流<事件>messageStream = env.addSource(kafkaConsumer);messageStream.rebalance().print();env.execute("Flink AVRO KAFKA 测试");}

我已经使用 avro 工具和架构rocana.avsc"创建了我的 Event.java 文件

java -jar/path/to/avro-tools-1.8.1.jar 编译模式 rocana.avsc

这是上传到 github 的 rocana.avsc 文件.><块引用>

AvroDeserializationSchema.java

import org.apache.avro.io.BinaryDecoder;导入 org.apache.avro.io.DatumReader;导入 org.apache.avro.io.DecoderFactory;导入 org.apache.avro.reflect.ReflectDatumReader;导入 org.apache.avro.specific.SpecificDatumReader;导入 org.apache.flink.api.common.typeinfo.TypeInformation;导入 org.apache.flink.api.java.typeutils.TypeExtractor;导入 org.apache.flink.streaming.util.serialization.DeserializationSchema;公共类 AvroDeserializationSchema实现 DeserializationSchema<T>{私有静态最终长serialVersionUID = 4330538776656642778L;私人final Class<T>avrotype;私有瞬态 DatumReader<T>读者;私有瞬态 BinaryDecoder 解码器;公共 AvroDeserializationSchema(Class avroType) {this.avroType = avroType;}@覆盖公共 T 反序列化(字节 [] 消息){确保初始化();尝试 {解码器 = DecoderFactory.get().binaryDecoder(message,decoder);返回 reader.read(null,decoder);} 捕获(异常 e){抛出新的运行时异常(e);}}@覆盖公共布尔 isEndOfStream(T nextElement) {返回假;}@覆盖公共类型信息<T>getProducedType() {返回 TypeExtractor.getForClass(avroType);}私有无效确保初始化(){如果(读者==空){如果 (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {reader = new SpecificDatumReader(avroType);} 别的 {reader = new ReflectDatumReader(avroType);}}}}

在运行我的程序时出现以下错误:

17:25:30,759 INFO org.apache.zookeeper.ZooKeeper - 会话:0x356350cb9001857 关闭17:25:30,759 信息 org.apache.zookeeper.ClientCnxn - EventThread 关闭17:25:30,761 INFO org.apache.flink.runtime.taskmanager.Task - 来源:自定义来源 (3/4) 切换到 FAILED 异常.java.lang.Exception: 2在 org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)在 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)在 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)在 org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)在 java.lang.Thread.run(未知来源)引起:java.lang.ArrayIndexOutOfBoundsException:2在 org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402)在 org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)在 org.apache.avro.io.parsing.Parser.advance(Parser.java:88)在 org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)在 org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)在 org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)在 org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)在 org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)在 org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)在 org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)在 com.google.cloud.dataflow.sdk.coders.AvroCoder.decode(AvroCoder.java:274)在 org.fmr.flink.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:52)在 org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39)在 org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657)17:25:30,769 信息 org.apache.flink.runtime.taskmanager.Task - 为源释放任务资源:自定义源 (3/4)17:25:30,776 INFO org.apache.flink.runtime.taskmanager.Task - 接收器:未命名(1/4)切换

我认为我的反序列化代码不正确.有谁知道我做错了什么?这是使用 Flink 从 Kafka 读取 AVRO 数据的方法还是有更好的方法?

解决方案

我根据我的问题发布的任何代码都可以正常工作.

问题是发送到 kafka 主题的数据,JSON 和 AVRO 格式的数据都发送到那里.我订阅了一个不同的 Kafka 主题,其中数据仅在 AVRO 中并且我的代码运行良好.

I am trying to read AVRO data from a Kafka topic using Flink 1.0.3.

I just know that this particular Kafka topic is having AVRO encoded message and I am having the AVRO schema file.

My Flink code:

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");
        properties.setProperty("zookeeper.connect", "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");
        properties.setProperty("group.id", "Zeeshantest");
        AvroDeserializationSchema<Event> avroSchema = new AvroDeserializationSchema<>(Event.class);
        FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
        DataStream<Event> messageStream = env.addSource(kafkaConsumer);
        messageStream.rebalance().print();
        env.execute("Flink AVRO KAFKA Test");
    }

I have created my Event.java file using the avro tools and schema "rocana.avsc"

java -jar /path/to/avro-tools-1.8.1.jar compile schema rocana.avsc

Here is rocana.avsc file uploaded in github.

AvroDeserializationSchema.java

import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

    private static final long serialVersionUID = 4330538776656642778L;

    private final Class<T> avroType;
    private transient DatumReader<T> reader;
    private transient BinaryDecoder decoder;

    public AvroDeserializationSchema(Class<T> avroType) {
        this.avroType = avroType;
    }

    @Override
    public T deserialize(byte[] message) {
        ensureInitialized();
        try {
            decoder = DecoderFactory.get().binaryDecoder(message, decoder);
            return reader.read(null, decoder);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public boolean isEndOfStream(T nextElement) {
        return false;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(avroType);
    }

    private void ensureInitialized() {
        if (reader == null) {
            if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
                reader = new SpecificDatumReader<T>(avroType);
            } else {
                reader = new ReflectDatumReader<T>(avroType);
            }
        }
    }
}

On running my program I am getting the following error:

17:25:30,759 INFO  org.apache.zookeeper.ZooKeeper                                - Session: 0x356350cb9001857 closed
17:25:30,759 INFO  org.apache.zookeeper.ClientCnxn                               - EventThread shut down
17:25:30,761 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (3/4) switched to FAILED with exception.
java.lang.Exception: 2
    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
    at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402)
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
    at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
    at com.google.cloud.dataflow.sdk.coders.AvroCoder.decode(AvroCoder.java:274)
    at org.fmr.flink.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:52)
    at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39)
    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657)
17:25:30,769 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source (3/4)
17:25:30,776 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Unnamed (1/4) switched

I think my deserialize code is not correct. Do anyone know what I am doing wrong? Is this the way to read AVRO data from Kafka using Flink or there is a better way around?

解决方案

Whatever code I posted as per my question works perfectly fine.

Issue was with the data sent to kafka topic, both JSON and AVRO formatted data was sent there. I subscribed to a different Kafka topic where data was only in AVRO and my code worked fine.

这篇关于如何使用 Avro 和 Flink 解码 Kafka 消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆