如何使用Avro和Flink解码Kafka消息 [英] How to decode Kafka messages using Avro and Flink
问题描述
我正在尝试使用Flink
1.0.3从Kafka
主题读取AVRO
数据.
I am trying to read AVRO
data from a Kafka
topic using Flink
1.0.3.
我只知道这个特定的Kafka主题具有AVRO编码的消息,而我具有AVRO模式文件.
I just know that this particular Kafka topic is having AVRO encoded message and I am having the AVRO schema file.
我的Flink代码:
My Flink code:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");
properties.setProperty("zookeeper.connect", "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");
properties.setProperty("group.id", "Zeeshantest");
AvroDeserializationSchema<Event> avroSchema = new AvroDeserializationSchema<>(Event.class);
FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
DataStream<Event> messageStream = env.addSource(kafkaConsumer);
messageStream.rebalance().print();
env.execute("Flink AVRO KAFKA Test");
}
我已经使用avro工具和模式"rocana.avsc"创建了Event.java
文件
I have created my Event.java
file using the avro tools and schema "rocana.avsc"
java -jar /path/to/avro-tools-1.8.1.jar compile schema rocana.avsc
这是 rocana.avsc 文件上传到github.
Here is rocana.avsc file uploaded in github.
AvroDeserializationSchema.java
AvroDeserializationSchema.java
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
private static final long serialVersionUID = 4330538776656642778L;
private final Class<T> avroType;
private transient DatumReader<T> reader;
private transient BinaryDecoder decoder;
public AvroDeserializationSchema(Class<T> avroType) {
this.avroType = avroType;
}
@Override
public T deserialize(byte[] message) {
ensureInitialized();
try {
decoder = DecoderFactory.get().binaryDecoder(message, decoder);
return reader.read(null, decoder);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public boolean isEndOfStream(T nextElement) {
return false;
}
@Override
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avroType);
}
private void ensureInitialized() {
if (reader == null) {
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
reader = new SpecificDatumReader<T>(avroType);
} else {
reader = new ReflectDatumReader<T>(avroType);
}
}
}
}
在运行程序时,出现以下错误:
On running my program I am getting the following error:
17:25:30,759 INFO org.apache.zookeeper.ZooKeeper - Session: 0x356350cb9001857 closed
17:25:30,759 INFO org.apache.zookeeper.ClientCnxn - EventThread shut down
17:25:30,761 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (3/4) switched to FAILED with exception.
java.lang.Exception: 2
at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at com.google.cloud.dataflow.sdk.coders.AvroCoder.decode(AvroCoder.java:274)
at org.fmr.flink.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:52)
at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39)
at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657)
17:25:30,769 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source (3/4)
17:25:30,776 INFO org.apache.flink.runtime.taskmanager.Task - Sink: Unnamed (1/4) switched
我认为我的反序列化代码不正确.有人知道我在做什么错吗?这是使用Flink从Kafka读取AVRO数据的方法,还是有更好的方法?
I think my deserialize code is not correct. Do anyone know what I am doing wrong? Is this the way to read AVRO data from Kafka using Flink or there is a better way around?
推荐答案
根据我的问题发布的任何代码都可以正常工作.
Whatever code I posted as per my question works perfectly fine.
问题与发送到kafka主题的数据有关,JSON和AVRO格式化的数据都发送到了那里.我订阅了另一个Kafka主题,其中的数据仅存在于AVRO中,并且我的代码运行良好.
Issue was with the data sent to kafka topic, both JSON and AVRO formatted data was sent there. I subscribed to a different Kafka topic where data was only in AVRO and my code worked fine.
这篇关于如何使用Avro和Flink解码Kafka消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!