在Scala中反序列化Avro数据时遇到问题 [英] Trouble with deserializing Avro data in Scala
问题描述
我正在Scala中构建一个Apache Flink应用程序,该应用程序从Kafka总线读取流数据,然后对其进行汇总操作.来自Kafka的数据为Avro格式,需要特殊的反序列化类.我发现了这个Scala类AvroDeserializationScehema( http://codegists.com/snippet/scala/avrodeserializationschemascala_saveveltri_ ):
I am building an Apache Flink application in Scala which reads streaming data from a Kafka bus and then performs summarizing operations on it. The data from Kafka is in Avro format and needs a special Deserialization class. I found this scala class AvroDeserializationScehema (http://codegists.com/snippet/scala/avrodeserializationschemascala_saveveltri_scala):
package org.myorg.quickstart
import org.apache.avro.io.BinaryDecoder
import org.apache.avro.io.DatumReader
import org.apache.avro.io.DecoderFactory
import org.apache.avro.reflect.ReflectDatumReader
import org.apache.avro.specific.{SpecificDatumReader, SpecificRecordBase}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.common.serialization._
import java.io.IOException
class AvroDeserializationSchema[T](val avroType: Class[T]) extends DeserializationSchema[T] {
private var reader: DatumReader[T] = null
private var decoder : BinaryDecoder = null
def deserialize(message: Array[Byte]): T = {
ensureInitialized()
try {
decoder = DecoderFactory.get.binaryDecoder(message, decoder)
reader.read(null.asInstanceOf[T], decoder)
}
catch {
case e: IOException => {
throw new RuntimeException(e)
}
}
}
def isEndOfStream(nextElement: T): Boolean = false
def getProducedType: TypeInformation[T] = TypeExtractor.getForClass(avroType)
private def ensureInitialized() {
if (reader == null) {
if (classOf[SpecificRecordBase].isAssignableFrom(avroType)) {
reader = new SpecificDatumReader[T](avroType)
}
else {
reader = new ReflectDatumReader[T](avroType)
}
}
}
}
在我的流媒体课程中,我按以下方式使用它:
In my streaming class i use this as follows:
val stream = env
.addSource(new FlinkKafkaConsumer010[String]("test", new
AvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))
其中DeviceData是在同一项目中定义的Scala案例类
where DeviceData is an Scala case class defined in the same project
/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
sw_version: String,
timestamp: String,
reading: Double
)
在编译StreamingKafkaClient.scala类时出现以下错误
I get the following error on compiling the StreamingKafkaClient.scala class
Error:(24, 102) object java.lang.Class is not a value
.addSource(new FlinkKafkaConsumer010[String]("test", new
AvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))
也尝试过
val stream = env
.addSource(new FlinkKafkaConsumer010[String]("test", new
AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))
因此我得到了另一个错误:
With that i get a different error:
Error:(21, 20) overloaded method constructor FlinkKafkaConsumer010 with alternatives:
(x$1: java.util.regex.Pattern,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x$1: java.util.regex.Pattern,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x$1: java.util.List[String],x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x$1: java.util.List[String],x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x$1: String,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x$1: String,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String]
cannot be applied to (String, org.myorg.quickstart.AvroDeserializationSchema[org.myorg.quickstart.DeviceData], java.util.Properties)
.addSource(new FlinkKafkaConsumer010[String]("test", new AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))
我在Scala中是一个全新的人(这是我的第一个Scala程序),所以我知道我在这里缺少一些基本知识.当我尝试学习Scala时,有人可以指出我做错了什么.我的目的是基本上将来自Kafka的avro编码数据读取到Flink中,并对流数据进行一些操作.我找不到任何使用AvroDeserializationSchema类的示例,在我看来,这应该内置于Flink软件包中.
I am completely new at Scala (this is my first scala program) so i know i am missing something fundamental here. As i try to learn Scala could someone please point out what am i doing wrong. My intent is to basically read avro encoded data from Kafka into Flink and do some operations on streaming data. I could not find any examples of the usage of AvroDeserializationSchema class, seems to me this is something that should be natively built into Flink packages.
推荐答案
为了在Scala中获取类对象,您需要classOf[DeviceData]
,而不是Class[DeviceData]
In order to get a class object in Scala, you want classOf[DeviceData]
, not Class[DeviceData]
new AvroDeserializationSchema[DeviceData](classOf[DeviceData])
我找不到AvroDeserializationSchema类用法的任何示例
I could not find any examples of the usage of AvroDeserializationSchema class
此外,在Flink 1.6发行版中,它们将添加此类而不是您从其他地方复制. FLINK-9337 & FLINK-9338
Also, it looks like in Flink 1.6 release, they will add this class rather than you copying from elsewhere. FLINK-9337 & FLINK-9338
如评论中所述,如果您想使用Confluent Avro Schema Registry而不是提供类类型,请查看此答案,或参考上面的Github链接
As mentioned in the comments, if you would like to use the Confluent Avro Schema Registry rather than giving a class type, see this answer, or refer to the code in the above Github link
另外,如果您正在运行Kafka 0.11+(或Confluent 3.3+),那么理想情况下,您应该将FlinkKafkaConsumer011
与要反序列化的类一起使用
Additionally, if you are running Kafka 0.11+ (or Confluent 3.3+), then you should ideally be using FlinkKafkaConsumer011
along with the class you are deserializing to
new FlinkKafkaConsumer011[DeviceData]
这篇关于在Scala中反序列化Avro数据时遇到问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!