java.lang.Instantiation在将字节流反序列化为Scala case类对象时发生异常 [英] java.lang.Instantiation Exception while deserializing a byte stream into a Scala case class object

查看:156
本文介绍了java.lang.Instantiation在将字节流反序列化为Scala case类对象时发生异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将avro字节流反序列化为scala case类对象.基本上,我有一个带有avro编码数据流的kafka流,现在该模式有一个附加功能,我正在尝试更新scala case类以包括新字段.案例类看起来像这样

I am trying to deserialize an avro byte stream into a scala case class object. Basically, i had a kafka stream with avro encoded data flowing and now there is an addition to the schema and i am trying to update the scala case class to include the new field. The case class looks like this

/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
                sw_version: String,
                timestamp: String,
                reading: Double,
                new_field: Option[String] = None
               )  {

this()= this("na","na","na",0,无) }

this() = this("na", "na", "na", 0, None) }

avro架构如下:

{
  "type": "record",
  "name": "some_name",
  "namespace": "some_namespace",
  "fields": [
    {
      "name": "deviceId",
      "type": "string"
    },
    {
      "name": "sw_version",
      "type": "string"
    }, 
    {
      "name": "timestamp",
      "type": "string"
    },
    {
      "name": "reading",
      "type": "double"
    },
    {
      "name": "new_field",
     "type": ["null", "string"],
      "default": null
    }]}

当接收到数据时,出现以下异常:

When the data is received i get the following exception:

java.lang.RuntimeException: java.lang.InstantiationException

我可以用python编写的使用者接收数据,因此我知道数据以正确的格式正确传输. 我怀疑问题出在案例类构造函数的创建上,我尝试这样做:

I can receive the data just fine a consumer written in python so i know that the data is being streamed correctly in the correct format. I am suspecting the problem is with the creation of the case class constructor, i have tried doing this:

/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
                sw_version: String,
                timestamp: String,
                reading: Double,
                new_field: Option[String]
               )  {
this() = this("na", "na", "na", 0, some("na"))
}

但没有运气.

解串器代码为(摘录):

The deserializer code is (excerpts):

// reader and decoder for reading avro records
private var reader: DatumReader[T] = null
private var decoder : BinaryDecoder = null
decoder = DecoderFactory.get.binaryDecoder(message, decoder)
reader.read(null.asInstanceOf[T], decoder)

我找不到其他用于反序列化avro的case类的构造函数的其他示例,我去年曾发布过一个相关问题

I could not find any other examples of having constructors for case classes which are used for deserializing avro, i had posted a related question last year java.lang.NoSuchMethodException for init method in Scala case class and based on the response i was able to implement my current code which has been working fine ever since.

推荐答案

我通过采用完全不同的方法解决了此问题.我使用了本示例中提供的Confluent Kafka客户端 https://github.com/jfrazee/schema-registry-examples/tree/master/src/main/scala/io/atomicfinch/examples/flink .我也有一个Confluent模式注册表,使用kafka随附的容器化全功能解决方案和模式注册表真的很容易设置

I resolved this problem by following a totally different approach. I used the Confluent Kafka client as provided in this example https://github.com/jfrazee/schema-registry-examples/tree/master/src/main/scala/io/atomicfinch/examples/flink. I also have a Confluent schema registry which is really easy to setup using the containerized all in one solution that comes with kafka and a schema registry https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html.

我必须在我的pom.xml文件中添加合流的依赖项和存储库.这在存储库部分中.

I had to add confluent dependencies and repo in my pom.xml file. This goes in the repository section.

<repository>
    <id>confluent</id>
    <url>http://packages.confluent.io/maven/</url>
</repository>

这出现在依赖项部分:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro-confluent-registry</artifactId>
    <version>1.8.0</version>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <!-- For Confluent Platform 5.2.1 -->
    <version>5.2.1</version>
</dependency>

使用

With the code provided in https://github.com/jfrazee/schema-registry-examples/blob/master/src/main/scala/io/atomicfinch/examples/flink/ConfluentRegistryDeserializationSchema.scala i was able to talk to Confluent schema registry and then based on the schema id in the avro message header this downloads the schema from the schema reg and gives me back a GenericRecord object from which i can easily any and all fields of interest and create a new DataStream of the DeviceData object.

val kafka_consumer = new FlinkKafkaConsumer010("prod.perfwarden.minute",
  new ConfluentRegistryDeserializationSchema[GenericRecord](classOf[GenericRecord], "http://localhost:8081"),
  properties)
val device_data_stream = env
  .addSource(kafka_consumer)
  .map({x => new DeviceData(x.get("deviceId").toString,
    x.get("sw_version").toString,
    x.get("timestamp").toString,
    x.get("reading").toString.toDouble,
    x.get("new_field").toString)})

融合的kafka客户端负责按照架构(包括默认值)反序列化avro字节流.设置模式注册表并使用融合的kafka客户端可能需要一点时间来习惯,但可能是更好的长期解决方案,只需2美分.

The confluent kafka client takes care of deserializing the avro bytes stream as per the schema, including the default values. Setting up the schema registry and using the confluent kafka client may take just a little bit of time to get used to but is probably the better long term solution, just my 2 cents.

这篇关于java.lang.Instantiation在将字节流反序列化为Scala case类对象时发生异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆