启用了Spark Streaming Checkpoint的java.io.NotSerializableException [英] java.io.NotSerializableException with Spark Streaming Checkpoint enabled

查看:87
本文介绍了启用了Spark Streaming Checkpoint的java.io.NotSerializableException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已在Spark Streaming应用程序中启用检查点,并在作为依赖项下载的类上遇到此错误.

I have enabled checkpointing in my spark streaming application and encounter this error on a class that is downloaded as a dependency.

无需检查点即可很好地运行该应用程序.

With no checkpointing the application works great.

错误:

com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer
Serialization stack:
    - object not serializable (class: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer, value: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer@46c7c593)
    - field (class: com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector, name: _paranamer, type: interface com.fasterxml.jackson.module.paranamer.shaded.Paranamer)
    - object (class com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector, com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector@39d62e47)
    - field (class: com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, name: _secondary, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
    - object (class com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair@7a925ac4)
    - field (class: com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, name: _primary, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
    - object (class com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair@203b98cf)
    - field (class: com.fasterxml.jackson.databind.cfg.BaseSettings, name: _annotationIntrospector, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
    - object (class com.fasterxml.jackson.databind.cfg.BaseSettings, com.fasterxml.jackson.databind.cfg.BaseSettings@78c34153)
    - field (class: com.fasterxml.jackson.databind.cfg.MapperConfig, name: _base, type: class com.fasterxml.jackson.databind.cfg.BaseSettings)
    - object (class com.fasterxml.jackson.databind.DeserializationConfig, com.fasterxml.jackson.databind.DeserializationConfig@2df0a4c3)
    - field (class: com.fasterxml.jackson.databind.ObjectMapper, name: _deserializationConfig, type: class com.fasterxml.jackson.databind.DeserializationConfig)
    - object (class com.fasterxml.jackson.databind.ObjectMapper, com.fasterxml.jackson.databind.ObjectMapper@2db07651)

我不确定如何将此类扩展为可序列化的Maven依赖项.我在pom.xml中使用了杰克逊核心v2.6.0.如果我尝试使用较新版本的Jackson核心,则会收到Incompatible Jackson版本异常.

I am not sure how to extend this class as serializable as its a maven dependency. I am using v2.6.0 of the jackson core in my pom.xml. If I try to use a newer version of Jackson core I am getting Incompatible Jackson version exception.

代码

liveRecordStream
      .foreachRDD(newRDD => {
        if (!newRDD.isEmpty()) {
          val cacheRDD = newRDD.cache()
          val updTempTables = tempTableView(t2s, stgDFMap, cacheRDD)
          val rdd = updatestgDFMap(stgDFMap, cacheRDD)
          persistStgTable(stgDFMap)
          dfMap
            .filter(entry => updTempTables.contains(entry._2))
            .map(spark.sql)
            .foreach( df => writeToES(writer, df))

          cacheRDD.unpersist()
        }
      }

在这种情况下,仅当像 tempTableView 这样的 foreachRDD 内部发生方法调用时,才会发生此问题.

The issue is happening only if a method call happens inside foreachRDD like tempTableView in this case.

tempTableView

tempTableView

def tempTableView(t2s: Map[String, StructType], stgDFMap: Map[String, DataFrame], cacheRDD: RDD[cacheRDD]): Set[String] = {
    stgDFMap.keys.filter { table =>
      val tRDD = cacheRDD
        .filter(r => r.Name == table)
        .map(r => r.values)
         val tDF = spark.createDataFrame(tRDD, tableNameToSchema(table))
      if (!tRDD.isEmpty()) {
        val tName = s"temp_$table"
        tDF.createOrReplaceTempView(tName)
      }
      !tRDD.isEmpty()
    }.toSet
  }

感谢您的帮助.不确定如何调试它并解决问题.

Any help is appreciated. Not sure how to debug this and fix the issue.

推荐答案

从您共享的代码片段中,我看不到 jackson 库的调用位置.但是,当您尝试通过有线方式发送未实现 Serializable 接口的对象时,通常会发生 NotSerializableException .

From the code snippet you had shared, I don't see where the jackson library is invoked. However, NotSerializableException usually happens when you try to send an object which doesn't implement Serializable interface over wire.

Spark是分布式处理引擎,这意味着它可以这样工作:跨节点有一个驱动程序和多个执行程序. driver 仅将需要计算的部分代码发送到 executors (通过电线).Spark转换以这种方式发生,即跨多个节点,并且如果您尝试将未实现 serializable 接口的类的实例传递给此类代码块(跨节点执行的块),则它将将抛出 NotSerializableException .

And Spark is distributed processing engine, meaning it works this way: There is a driver and multiple executors across nodes. Only the part of code that is needed to be computed is sent by the driver to the executors (over wire). Spark transformations happen in that way i.e. across multiple nodes and if you try pass an instance of a class, which doesn't implement serializable interface, to such code blocks (the block that executes across nodes), it will throw NotSerializableException.

例如:

def main(args: Array[String]): Unit = {
   val gson: Gson = new Gson()

   val sparkConf = new SparkConf().setMaster("local[2]")
   val spark = SparkSession.builder().config(sparkConf).getOrCreate()
   val rdd = spark.sparkContext.parallelize(Seq("0","1"))

   val something = rdd.map(str => {
     gson.toJson(str)
   })

   something.foreach(println)
   spark.close()
}

此代码块将引发 NotSerializableException ,因为我们正在将 Gson 的实例发送到分布式函数. map 是Spark转换操作,因此它将在执行程序上执行.以下将起作用:

This code block will throw NotSerializableException because we are sending an instance of Gson to a distributed function. map is a Spark transformation operation so it will execute on executors. The following will work:

def main(args: Array[String]): Unit = {

   val sparkConf = new SparkConf().setMaster("local[2]")
   val spark = SparkSession.builder().config(sparkConf).getOrCreate()
   val rdd = spark.sparkContext.parallelize(Seq("0","1"))

   val something = rdd.map(str => {
     val gson: Gson = new Gson()
     gson.toJson(str)
   })

   something.foreach(println)
   spark.close()
}

原因是上述方法可行的原因是,我们在转换中实例化 Gson ,因此它将在执行程序处实例化,这意味着它不会通过网络从驱动程序发送出去,因此无需序列化.

Reason why the above will work is, we are instantiating Gson within a transformation, so it will be instantiated at the executor, meaning it won't be sent from the driver program over the wire so no serialization is needed.

这篇关于启用了Spark Streaming Checkpoint的java.io.NotSerializableException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆