在 Scala 中反序列化 Avro 数据时遇到问题 [英] Trouble with deserializing Avro data in Scala

查看:64
本文介绍了在 Scala 中反序列化 Avro 数据时遇到问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在 Scala 中构建一个 Apache Flink 应用程序,它从 Kafka 总线读取流数据,然后对其执行汇总操作.来自 Kafka 的数据是 Avro 格式,需要一个特殊的反序列化类.我找到了这个 Scala 类 AvroDeserializationScehema (http://codegists.com/snippet/scala/avrodeserializationschemascala_snippet/scala/avrodeserializationschemascala_saveveltri_scala):

package org.myorg.quickstart导入 org.apache.avro.io.BinaryDecoder导入 org.apache.avro.io.DatumReader导入 org.apache.avro.io.DecoderFactory导入 org.apache.avro.reflect.ReflectDatumReader导入 org.apache.avro.specific.{SpecificDatumReader,SpecificRecordBase}导入 org.apache.flink.api.common.typeinfo.TypeInformation导入 org.apache.flink.api.java.typeutils.TypeExtractor导入 org.apache.flink.api.common.serialization._导入 java.io.IOException类 AvroDeserializationSchema[T](val avroType: Class[T]) 扩展 DeserializationSchema[T] {私有变量读取器:DatumReader[T] = null私有变量解码器:BinaryDecoder = nulldef反序列化(消息:数组[字节]):T = {确保初始化()尝试 {解码器 = DecoderFactory.get.binaryDecoder(消息,解码器)reader.read(null.asInstanceOf[T], 解码器)}抓住 {情况e:IOException =>{抛出新的运行时异常(e)}}}def isEndOfStream(nextElement: T): Boolean = falsedef getProducedType: TypeInformation[T] = TypeExtractor.getForClass(avroType)私有定义确保初始化(){如果(读者==空){如果(classOf[SpecificRecordBase].isAssignableFrom(avroType)){reader = new SpecificDatumReader[T](avroType)}别的 {reader = new ReflectDatumReader[T](avroType)}}}}

在我的流媒体课程中,我按如下方式使用它:

val 流 = env.addSource(new FlinkKafkaConsumer010[String]("test", newAvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))

其中 DeviceData 是在同一个项目中定义的 Scala 案例类

/** Case 类来保存设备数据.*/案例类 DeviceData(deviceId: String,sw_version:字符串,时间戳:字符串,阅读:双)

我在编译 StreamingKafkaClient.scala 类时遇到以下错误

Error:(24, 102) object java.lang.Class is not a value.addSource(new FlinkKafkaConsumer010[String]("test", newAvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))

也试过了

val 流 = env.addSource(new FlinkKafkaConsumer010[String]("test", newAvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))

我得到一个不同的错误:

Error:(21, 20) 重载方法构造函数 FlinkKafkaConsumer010 和替代方法:(x$1: java.util.regex.Pattern,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>(x$1: java.util.regex.Pattern,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>(x$1: java.util.List[String],x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connector.kafka.FlinkKafkaConsumer010[String] <and>(x$1: java.util.List[String],x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connector.kafka.FlinkKafkaConsumer010[String] <and>(x$1: String,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String]] <和>(x$1: String,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String]]不能应用于(字符串,org.myorg.quickstart.AvroDeserializationSchema[org.myorg.quickstart.DeviceData],java.util.Properties).addSource(new FlinkKafkaConsumer010[String]("test", new AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))

我是 Scala 的新手(这是我的第一个 Scala 程序),所以我知道我在这里遗漏了一些基本的东西.当我尝试学习 Scala 时,有人可以指出我做错了什么.我的意图基本上是将 avro 编码的数据从 Kafka 读入 Flink 并对流数据进行一些操作.我找不到任何使用 AvroDeserializationSchema 类的示例,在我看来,这应该是原生构建到 Flink 包中的东西.

为了在 Scala 中获取类对象,您需要 classOf[DeviceData],而不是 Class[DeviceData]

new AvroDeserializationSchema[DeviceData](classOf[DeviceData])

<块引用>

我找不到任何使用 AvroDeserializationSchema 类的示例

我找到了一个(在 Java 中)

另外,看起来在 Flink 1.6 版本中,他们会添加这个类而不是你从其他地方复制.FLINK-9337 &FLINK-9338

如评论中所述,如果您想使用 Confluent Avro Schema Registry 而不是提供类类型,请参阅此答案,或者参考上面Github链接中的代码

此外,如果您正在运行 Kafka 0.11+(或 Confluent 3.3+),那么理想情况下您应该使用 FlinkKafkaConsumer011 以及您要反序列化的类

new FlinkKafkaConsumer011[DeviceData]

I am building an Apache Flink application in Scala which reads streaming data from a Kafka bus and then performs summarizing operations on it. The data from Kafka is in Avro format and needs a special Deserialization class. I found this scala class AvroDeserializationScehema (http://codegists.com/snippet/scala/avrodeserializationschemascala_saveveltri_scala):

package org.myorg.quickstart
import org.apache.avro.io.BinaryDecoder
import org.apache.avro.io.DatumReader
import org.apache.avro.io.DecoderFactory
import org.apache.avro.reflect.ReflectDatumReader
import org.apache.avro.specific.{SpecificDatumReader, SpecificRecordBase}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.common.serialization._
import java.io.IOException

class AvroDeserializationSchema[T](val avroType: Class[T]) extends DeserializationSchema[T] {
  private var reader: DatumReader[T] = null
  private var decoder : BinaryDecoder = null

  def deserialize(message: Array[Byte]): T = {
    ensureInitialized()
    try {
      decoder = DecoderFactory.get.binaryDecoder(message, decoder)
      reader.read(null.asInstanceOf[T], decoder)
    }
    catch {
      case e: IOException => {
        throw new RuntimeException(e)
      }
    }
  }

  def isEndOfStream(nextElement: T): Boolean = false


  def getProducedType: TypeInformation[T] = TypeExtractor.getForClass(avroType)

  private def ensureInitialized() {
    if (reader == null) {
      if (classOf[SpecificRecordBase].isAssignableFrom(avroType)) {
        reader = new SpecificDatumReader[T](avroType)
      }
      else {
        reader = new ReflectDatumReader[T](avroType)
      }
    }
  }
}

In my streaming class i use this as follows:

val stream = env
        .addSource(new FlinkKafkaConsumer010[String]("test", new 
AvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))

where DeviceData is an Scala case class defined in the same project

/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
                    sw_version: String,
                    timestamp: String,
                    reading: Double
                   )

I get the following error on compiling the StreamingKafkaClient.scala class

Error:(24, 102) object java.lang.Class is not a value
        .addSource(new FlinkKafkaConsumer010[String]("test", new 
AvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))

Also tried

val stream = env
        .addSource(new FlinkKafkaConsumer010[String]("test", new 
AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))

With that i get a different error:

Error:(21, 20) overloaded method constructor FlinkKafkaConsumer010 with alternatives:
  (x$1: java.util.regex.Pattern,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
  (x$1: java.util.regex.Pattern,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
  (x$1: java.util.List[String],x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
  (x$1: java.util.List[String],x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
  (x$1: String,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
  (x$1: String,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String]
 cannot be applied to (String, org.myorg.quickstart.AvroDeserializationSchema[org.myorg.quickstart.DeviceData], java.util.Properties)
        .addSource(new FlinkKafkaConsumer010[String]("test", new AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))

I am completely new at Scala (this is my first scala program) so i know i am missing something fundamental here. As i try to learn Scala could someone please point out what am i doing wrong. My intent is to basically read avro encoded data from Kafka into Flink and do some operations on streaming data. I could not find any examples of the usage of AvroDeserializationSchema class, seems to me this is something that should be natively built into Flink packages.

解决方案

In order to get a class object in Scala, you want classOf[DeviceData], not Class[DeviceData]

new AvroDeserializationSchema[DeviceData](classOf[DeviceData])

I could not find any examples of the usage of AvroDeserializationSchema class

I found one (in Java)

Also, it looks like in Flink 1.6 release, they will add this class rather than you copying from elsewhere. FLINK-9337 & FLINK-9338

As mentioned in the comments, if you would like to use the Confluent Avro Schema Registry rather than giving a class type, see this answer, or refer to the code in the above Github link

Additionally, if you are running Kafka 0.11+ (or Confluent 3.3+), then you should ideally be using FlinkKafkaConsumer011 along with the class you are deserializing to

new FlinkKafkaConsumer011[DeviceData]

这篇关于在 Scala 中反序列化 Avro 数据时遇到问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆