在 Scala 中反序列化 Avro 数据时遇到问题 [英] Trouble with deserializing Avro data in Scala
问题描述
我正在 Scala 中构建一个 Apache Flink 应用程序,它从 Kafka 总线读取流数据,然后对其执行汇总操作.来自 Kafka 的数据是 Avro 格式,需要一个特殊的反序列化类.我找到了这个 Scala 类 AvroDeserializationScehema (http://codegists.com/snippet/scala/avrodeserializationschemascala_snippet/scala/avrodeserializationschemascala_saveveltri_scala):
package org.myorg.quickstart导入 org.apache.avro.io.BinaryDecoder导入 org.apache.avro.io.DatumReader导入 org.apache.avro.io.DecoderFactory导入 org.apache.avro.reflect.ReflectDatumReader导入 org.apache.avro.specific.{SpecificDatumReader,SpecificRecordBase}导入 org.apache.flink.api.common.typeinfo.TypeInformation导入 org.apache.flink.api.java.typeutils.TypeExtractor导入 org.apache.flink.api.common.serialization._导入 java.io.IOException类 AvroDeserializationSchema[T](val avroType: Class[T]) 扩展 DeserializationSchema[T] {私有变量读取器:DatumReader[T] = null私有变量解码器:BinaryDecoder = nulldef反序列化(消息:数组[字节]):T = {确保初始化()尝试 {解码器 = DecoderFactory.get.binaryDecoder(消息,解码器)reader.read(null.asInstanceOf[T], 解码器)}抓住 {情况e:IOException =>{抛出新的运行时异常(e)}}}def isEndOfStream(nextElement: T): Boolean = falsedef getProducedType: TypeInformation[T] = TypeExtractor.getForClass(avroType)私有定义确保初始化(){如果(读者==空){如果(classOf[SpecificRecordBase].isAssignableFrom(avroType)){reader = new SpecificDatumReader[T](avroType)}别的 {reader = new ReflectDatumReader[T](avroType)}}}}
在我的流媒体课程中,我按如下方式使用它:
val 流 = env.addSource(new FlinkKafkaConsumer010[String]("test", newAvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))
其中 DeviceData 是在同一个项目中定义的 Scala 案例类
/** Case 类来保存设备数据.*/案例类 DeviceData(deviceId: String,sw_version:字符串,时间戳:字符串,阅读:双)
我在编译 StreamingKafkaClient.scala 类时遇到以下错误
Error:(24, 102) object java.lang.Class is not a value.addSource(new FlinkKafkaConsumer010[String]("test", newAvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))
也试过了
val 流 = env.addSource(new FlinkKafkaConsumer010[String]("test", newAvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))
我得到一个不同的错误:
Error:(21, 20) 重载方法构造函数 FlinkKafkaConsumer010 和替代方法:(x$1: java.util.regex.Pattern,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>(x$1: java.util.regex.Pattern,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>(x$1: java.util.List[String],x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connector.kafka.FlinkKafkaConsumer010[String] <and>(x$1: java.util.List[String],x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connector.kafka.FlinkKafkaConsumer010[String] <and>(x$1: String,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String]] <和>(x$1: String,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String]]不能应用于(字符串,org.myorg.quickstart.AvroDeserializationSchema[org.myorg.quickstart.DeviceData],java.util.Properties).addSource(new FlinkKafkaConsumer010[String]("test", new AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))
我是 Scala 的新手(这是我的第一个 Scala 程序),所以我知道我在这里遗漏了一些基本的东西.当我尝试学习 Scala 时,有人可以指出我做错了什么.我的意图基本上是将 avro 编码的数据从 Kafka 读入 Flink 并对流数据进行一些操作.我找不到任何使用 AvroDeserializationSchema 类的示例,在我看来,这应该是原生构建到 Flink 包中的东西.
为了在 Scala 中获取类对象,您需要 classOf[DeviceData]
,而不是 Class[DeviceData]代码>
new AvroDeserializationSchema[DeviceData](classOf[DeviceData])
<块引用>
我找不到任何使用 AvroDeserializationSchema 类的示例
另外,看起来在 Flink 1.6 版本中,他们会添加这个类而不是你从其他地方复制.FLINK-9337 &FLINK-9338
如评论中所述,如果您想使用 Confluent Avro Schema Registry 而不是提供类类型,请参阅此答案,或者参考上面Github链接中的代码
此外,如果您正在运行 Kafka 0.11+(或 Confluent 3.3+),那么理想情况下您应该使用 FlinkKafkaConsumer011
以及您要反序列化的类
new FlinkKafkaConsumer011[DeviceData]
I am building an Apache Flink application in Scala which reads streaming data from a Kafka bus and then performs summarizing operations on it. The data from Kafka is in Avro format and needs a special Deserialization class. I found this scala class AvroDeserializationScehema (http://codegists.com/snippet/scala/avrodeserializationschemascala_saveveltri_scala):
package org.myorg.quickstart
import org.apache.avro.io.BinaryDecoder
import org.apache.avro.io.DatumReader
import org.apache.avro.io.DecoderFactory
import org.apache.avro.reflect.ReflectDatumReader
import org.apache.avro.specific.{SpecificDatumReader, SpecificRecordBase}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.common.serialization._
import java.io.IOException
class AvroDeserializationSchema[T](val avroType: Class[T]) extends DeserializationSchema[T] {
private var reader: DatumReader[T] = null
private var decoder : BinaryDecoder = null
def deserialize(message: Array[Byte]): T = {
ensureInitialized()
try {
decoder = DecoderFactory.get.binaryDecoder(message, decoder)
reader.read(null.asInstanceOf[T], decoder)
}
catch {
case e: IOException => {
throw new RuntimeException(e)
}
}
}
def isEndOfStream(nextElement: T): Boolean = false
def getProducedType: TypeInformation[T] = TypeExtractor.getForClass(avroType)
private def ensureInitialized() {
if (reader == null) {
if (classOf[SpecificRecordBase].isAssignableFrom(avroType)) {
reader = new SpecificDatumReader[T](avroType)
}
else {
reader = new ReflectDatumReader[T](avroType)
}
}
}
}
In my streaming class i use this as follows:
val stream = env
.addSource(new FlinkKafkaConsumer010[String]("test", new
AvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))
where DeviceData is an Scala case class defined in the same project
/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
sw_version: String,
timestamp: String,
reading: Double
)
I get the following error on compiling the StreamingKafkaClient.scala class
Error:(24, 102) object java.lang.Class is not a value
.addSource(new FlinkKafkaConsumer010[String]("test", new
AvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))
Also tried
val stream = env
.addSource(new FlinkKafkaConsumer010[String]("test", new
AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))
With that i get a different error:
Error:(21, 20) overloaded method constructor FlinkKafkaConsumer010 with alternatives:
(x$1: java.util.regex.Pattern,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x$1: java.util.regex.Pattern,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x$1: java.util.List[String],x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x$1: java.util.List[String],x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x$1: String,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x$1: String,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String]
cannot be applied to (String, org.myorg.quickstart.AvroDeserializationSchema[org.myorg.quickstart.DeviceData], java.util.Properties)
.addSource(new FlinkKafkaConsumer010[String]("test", new AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))
I am completely new at Scala (this is my first scala program) so i know i am missing something fundamental here. As i try to learn Scala could someone please point out what am i doing wrong. My intent is to basically read avro encoded data from Kafka into Flink and do some operations on streaming data. I could not find any examples of the usage of AvroDeserializationSchema class, seems to me this is something that should be natively built into Flink packages.
In order to get a class object in Scala, you want classOf[DeviceData]
, not Class[DeviceData]
new AvroDeserializationSchema[DeviceData](classOf[DeviceData])
I could not find any examples of the usage of AvroDeserializationSchema class
Also, it looks like in Flink 1.6 release, they will add this class rather than you copying from elsewhere. FLINK-9337 & FLINK-9338
As mentioned in the comments, if you would like to use the Confluent Avro Schema Registry rather than giving a class type, see this answer, or refer to the code in the above Github link
Additionally, if you are running Kafka 0.11+ (or Confluent 3.3+), then you should ideally be using FlinkKafkaConsumer011
along with the class you are deserializing to
new FlinkKafkaConsumer011[DeviceData]
这篇关于在 Scala 中反序列化 Avro 数据时遇到问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!