java.lang.Instantiation 将字节流反序列化为 Scala 案例类对象时发生异常 [英] java.lang.Instantiation Exception while deserializing a byte stream into a Scala case class object

查看:25
本文介绍了java.lang.Instantiation 将字节流反序列化为 Scala 案例类对象时发生异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将 avro 字节流反序列化为 Scala 案例类对象.基本上,我有一个带有 avro 编码数据流的 kafka 流,现在有一个对架构的补充,我正在尝试更新 Scala 案例类以包含新字段.案例类看起来像这样

I am trying to deserialize an avro byte stream into a scala case class object. Basically, i had a kafka stream with avro encoded data flowing and now there is an addition to the schema and i am trying to update the scala case class to include the new field. The case class looks like this

/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
                sw_version: String,
                timestamp: String,
                reading: Double,
                new_field: Option[String] = None
               )  {

this() = this("na", "na", "na", 0, None)}

this() = this("na", "na", "na", 0, None) }

avro 架构如下:

{
  "type": "record",
  "name": "some_name",
  "namespace": "some_namespace",
  "fields": [
    {
      "name": "deviceId",
      "type": "string"
    },
    {
      "name": "sw_version",
      "type": "string"
    }, 
    {
      "name": "timestamp",
      "type": "string"
    },
    {
      "name": "reading",
      "type": "double"
    },
    {
      "name": "new_field",
     "type": ["null", "string"],
      "default": null
    }]}

收到数据时出现以下异常:

When the data is received i get the following exception:

java.lang.RuntimeException: java.lang.InstantiationException

我可以很好地接收用 python 编写的消费者的数据,所以我知道数据正在以正确的格式正确传输.我怀疑问题出在案例类构造函数的创建上,我试过这样做:

I can receive the data just fine a consumer written in python so i know that the data is being streamed correctly in the correct format. I am suspecting the problem is with the creation of the case class constructor, i have tried doing this:

/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
                sw_version: String,
                timestamp: String,
                reading: Double,
                new_field: Option[String]
               )  {
this() = this("na", "na", "na", 0, some("na"))
}

但没有运气.

解串器代码是(摘录):

The deserializer code is (excerpts):

// reader and decoder for reading avro records
private var reader: DatumReader[T] = null
private var decoder : BinaryDecoder = null
decoder = DecoderFactory.get.binaryDecoder(message, decoder)
reader.read(null.asInstanceOf[T], decoder)

我找不到任何其他具有用于反序列化 avro 的 case 类的构造函数的示例,我去年发布了一个相关问题 java.lang.NoSuchMethodException for Scala case class 中的 init 方法 并基于响应我能够实现我当前的代码从那以后一直工作得很好.

I could not find any other examples of having constructors for case classes which are used for deserializing avro, i had posted a related question last year java.lang.NoSuchMethodException for init method in Scala case class and based on the response i was able to implement my current code which has been working fine ever since.

推荐答案

我采用完全不同的方法解决了这个问题.我使用了本示例中提供的 Confluent Kafka 客户端 https://github.com/jfrazee/schema-registry-examples/tree/master/src/main/scala/io/atomicfinch/examples/flink.我还有一个 Confluent 模式注册表,使用 kafka 附带的容器化多合一解决方案和模式注册表 https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html.

I resolved this problem by following a totally different approach. I used the Confluent Kafka client as provided in this example https://github.com/jfrazee/schema-registry-examples/tree/master/src/main/scala/io/atomicfinch/examples/flink. I also have a Confluent schema registry which is really easy to setup using the containerized all in one solution that comes with kafka and a schema registry https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html.

我必须在我的 pom.xml 文件中添加融合的依赖项和 repo.这在存储库部分.

I had to add confluent dependencies and repo in my pom.xml file. This goes in the repository section.

<repository>
    <id>confluent</id>
    <url>http://packages.confluent.io/maven/</url>
</repository>

这在依赖部分:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro-confluent-registry</artifactId>
    <version>1.8.0</version>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <!-- For Confluent Platform 5.2.1 -->
    <version>5.2.1</version>
</dependency>

使用 https://github.com/jfrazee/schema-registry-examples/blob/master/src/main/scala/io/atomicfinch/examples/flink/ConfluentRegistryDeserializationSchema.scala能够与 Confluent 模式注册表对话,然后根据 avro 消息头中的模式 ID,从模式 reg 下载模式并返回一个 GenericRecord 对象,我可以轻松地从中轻松找到任何和所有感兴趣的字段并创建一个新的DeviceData 对象的数据流.

With the code provided in https://github.com/jfrazee/schema-registry-examples/blob/master/src/main/scala/io/atomicfinch/examples/flink/ConfluentRegistryDeserializationSchema.scala i was able to talk to Confluent schema registry and then based on the schema id in the avro message header this downloads the schema from the schema reg and gives me back a GenericRecord object from which i can easily any and all fields of interest and create a new DataStream of the DeviceData object.

val kafka_consumer = new FlinkKafkaConsumer010("prod.perfwarden.minute",
  new ConfluentRegistryDeserializationSchema[GenericRecord](classOf[GenericRecord], "http://localhost:8081"),
  properties)
val device_data_stream = env
  .addSource(kafka_consumer)
  .map({x => new DeviceData(x.get("deviceId").toString,
    x.get("sw_version").toString,
    x.get("timestamp").toString,
    x.get("reading").toString.toDouble,
    x.get("new_field").toString)})

融合的 kafka 客户端负责根据架构反序列化 avro 字节流,包括默认值.设置模式注册表和使用融合的 kafka 客户端可能需要一点时间来适应,但可能是更好的长期解决方案,只需我的 2 美分.

The confluent kafka client takes care of deserializing the avro bytes stream as per the schema, including the default values. Setting up the schema registry and using the confluent kafka client may take just a little bit of time to get used to but is probably the better long term solution, just my 2 cents.

这篇关于java.lang.Instantiation 将字节流反序列化为 Scala 案例类对象时发生异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆