启用检查点的Spark Streaming中的java.io.NotSerializableException [英] java.io.NotSerializableException in Spark Streaming with enabled checkpointing

查看:636
本文介绍了启用检查点的Spark Streaming中的java.io.NotSerializableException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面的代码:

def main(args: Array[String]) {
    val sc = new SparkContext
    val sec = Seconds(3)
    val ssc = new StreamingContext(sc, sec)
    ssc.checkpoint("./checkpoint")
    val rdd = ssc.sparkContext.parallelize(Seq("a","b","c"))
    val inputDStream = new ConstantInputDStream(ssc, rdd)

    inputDStream.transform(rdd => {
        val buf = ListBuffer[String]()
        buf += "1"
        buf += "2"
        buf += "3"
        val other_rdd = ssc.sparkContext.parallelize(buf)   // create a new rdd
        rdd.union(other_rdd)
    }).print()

    ssc.start()
    ssc.awaitTermination()
}

并引发异常:

java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.apache.spark.streaming.StreamingContext
Serialization stack:
    - object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@5626e185)
    - field (class: com.mirrtalk.Test$$anonfun$main$1, name: ssc$1, type: class org.apache.spark.streaming.StreamingContext)
    - object (class com.mirrtalk.Test$$anonfun$main$1, <function1>)
    - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, name: cleanedF$2, type: interface scala.Function1)
    - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, <function2>)
    - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, name: cleanedF$3, type: interface scala.Function2)
    - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, <function2>)
    - field (class: org.apache.spark.streaming.dstream.TransformedDStream, name: transformFunc, type: interface scala.Function2)

当我删除代码ssc.checkpoint("./checkpoint")时,应用程序可以正常运行,但是我需要启用检查点.

when I remove code ssc.checkpoint("./checkpoint"), the application can work well, but I need enable checkpoint.

启用检查点时如何解决此问题?

推荐答案

您可以将上下文初始化和配置任务移到main之外:

You can move context initialization and configuration tasks outside main:

object App {
  val sc = new SparkContext(new SparkConf().setAppName("foo").setMaster("local"))
  val sec = Seconds(3)
  val ssc = new StreamingContext(sc, sec)
  ssc.checkpoint("./checkpoint") // enable checkpoint

  def main(args: Array[String]) {
    val rdd = ssc.sparkContext.parallelize(Seq("a", "b", "c"))
    val inputDStream = new ConstantInputDStream(ssc, rdd)

    inputDStream.transform(rdd => {
      val buf = ListBuffer[String]()
      buf += "1"
      buf += "2"
      buf += "3"
      val other_rdd = ssc.sparkContext.parallelize(buf)
      rdd.union(other_rdd) // I want to union other RDD
    }).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

这篇关于启用检查点的Spark Streaming中的java.io.NotSerializableException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆