只有在没有课的对象调用外闭合功能时java.io.NotSerializableException:任务不序列化 [英] Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

查看:474
本文介绍了只有在没有课的对象调用外闭合功能时java.io.NotSerializableException:任务不序列化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

封闭之外调用函数时,遇到了奇怪的现象:


  • 当函数是在对象一切工作

  • 当函数是一类可以:

     任务不能序列:java.io.NotSerializableException:测试


问题是我需要我的code一类,而不是一个对象。任何想法,为什么发生这种情况?是Scala的对象序列化(默认?)?

这是一个工作code例如:

 对象的工作扩展应用{
    VAL列表=列表(1,2,3)    VAL rddList = Spark.ctx.parallelize(名单)
    //调用外闭合功能
    VAL后= rddList.map(someFunc(_))    DEF someFunc(一:智力)= A + 1    after.collect()图(的println(_))
  }

这是在非工作例如:

 对象NOTworking扩展应用{
     新的测试()。DOIT
  }
  //添加扩展序列化不会帮助
  一流的检测{    VAL列表=列表(1,2,3)    VAL rddList = Spark.ctx.parallelize(名单)    高清DOIT = {
      //再次呼叫温控功能someFunc
      VAL后= rddList.map(someFunc(_))
      //这将会崩溃(火花懒惰)
      after.collect()图(的println(_))
    }    DEF someFunc(一:智力)= A + 1  }


解决方案

我不认为对方的回答是完全正确的。 <一href=\"https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L78\">RDDs的确序列化,所以这不是什么导致你的任务失败。

火花是一个分布式计算引擎,其主要的抽象是弹性分布式数据集(的 RDD ),它可以被看作是一个分布式集合。基本上,RDD的元件被整个集群的节点分配,但火花抽象此远离用户,让用户与RDD(集合)进行交互,就好像它是一个本地的。

不要让太多细节,但是当你上运行不同的转换的RDD(地图 flatMap 过滤器等),你的变身code(闭)是:


  1. 连载司机节点上,

  2. 运到适当的节点集群中,

  3. 反序列化,

  4. 终于节点上执行

当然,你可以在本地运行此操作(因为在你的例子),但所有这些阶段(除了以上航运网)仍时有发生。 [这让你赶上任何错误,甚至部署到生产环境之前]

什么发生在你的第二个情况是,你调用一个方法,在类中定义的从地图函数内部测试。星火看到这一点,因为方法不能对自己被序列化,星火尝试序列的测试类,使code会当另一个JVM中执行仍然可以工作。你有两种可能性:

无论你做出一流的检测序列化的,所以整个类可以通过星火被序列化:

 进口org.apache.spark {SparkContext,SparkConf}对象星火{
  VAL CTX =新SparkContext(新SparkConf()。setAppName(测试)。setMaster(本地[*]))
}对象NOTworking扩展应用{
  新的测试()。DOIT
}Test类扩展了java.io.Serializable {
  VAL rddList = Spark.ctx.parallelize(列表(1,2,3))  DOIT高清(){=
    VAL后= rddList.map(someFunc)
    after.collect()的foreach(的println)
  }  DEF someFunc(一:智力)= A + 1
}

或你让 someFunc 函数而不是一个方法(函数在斯卡拉对象),使星火就能序列化:

 进口org.apache.spark {SparkContext,SparkConf}对象星火{
  VAL CTX =新SparkContext(新SparkConf()。setAppName(测试)。setMaster(本地[*]))
}对象NOTworking扩展应用{
  新的测试()。DOIT
}Test类{
  VAL rddList = Spark.ctx.parallelize(列表(1,2,3))  DOIT高清(){=
    VAL后= rddList.map(someFunc)
    after.collect()的foreach(的println)
  }  VAL someFunc =(A:强度)=&GT;一个+ 1
}

相似,但不一样的带班的序列化问题可能是你的兴趣,你可以在上面<读href=\"http://spark-summit.org/wp-content/uploads/2013/10/McDonough-spark-tutorial_spark-summit-2013.pptx\">in这个星火峰会2013 presentation 。

作为一个侧面说明

,你可以重写 rddList.map(someFunc(_)) rddList.map(someFunc),它们是完全一样的。通常情况下,第二个是pferred $ P $,因为它是更简洁和更清洁的阅读。

编辑(2015年3月15日): SPARK-5307 推出的 SerializationDebugger 并星火1.3.0就是用它的第一个版本。它增加了序列化路径到的 NotSerializableException 的。当遇到一个NotSerializableException,调试器访问对象图发现对那些不能被序列化对象的路径,构建了信息,以帮助用户找到的对象。

在OP的情况下,这是被打印到stdout:

 序列化堆栈:
     - 对象不是序列化(类:测试值:测试@ 2dfe2f00)
     - 场(类:测试$$ anonfun $ 1,名称:$外,类型:类测试)
     - 对象(类测试$$ anonfun $ 1,&LT;功能1&GT;)

Getting strange behavior when calling function outside of a closure:

  • when function is in a object everything is working
  • when function is in a class get :

    Task not serializable: java.io.NotSerializableException: testing
    

The problem is I need my code in a class and not an object. Any idea why this is happening? Is a Scala object serialized (default?)?

This is a working code example:

 object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
  }

This is the non-working example :

  object NOTworking extends App {
     new testing().doIT
  }
  //adding extends Serializable wont help
  class testing {

    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)

    def doIT =  {
      //again calling the fucntion someFunc 
      val after = rddList.map(someFunc(_))
      //this will crash (spark lazy)
      after.collect().map(println(_))
    }

    def someFunc(a:Int) = a+1

  }

解决方案

I don't think the other answer is entirely correct. RDDs are indeed serializable, so this is not what's causing your task to fail.

Spark is a distributed computing engine and its main abstraction is a resilient distributed dataset (RDD), which can be viewed as a distributed collection. Basically, RDD's elements are partitioned across the nodes of the cluster, but Spark abstracts this away from the user, letting the user interact with the RDD (collection) as if it were a local one.

Not to get into too many details, but when you run different transformations on a RDD (map, flatMap, filter and others), your transformation code (closure) is:

  1. serialized on the driver node,
  2. shipped to the appropriate nodes in the cluster,
  3. deserialized,
  4. and finally executed on the nodes

You can of course run this locally (as in your example), but all those phases (apart from shipping over network) still occur. [This lets you catch any bugs even before deploying to production]

What happens in your second case is that you are calling a method, defined in class testing from inside the map function. Spark sees that and since methods cannot be serialized on their own, Spark tries to serialize the whole testing class, so that the code will still work when executed in another JVM. You have two possibilities:

Either you make class testing serializable, so the whole class can be serialized by Spark:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

or you make someFunc function instead of a method (functions are objects in Scala), so that Spark will be able to serialize it:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

Similar, but not the same problem with class serialization can be of interest to you and you can read on it in this Spark Summit 2013 presentation.

As a side note, you can rewrite rddList.map(someFunc(_)) to rddList.map(someFunc), they are exactly the same. Usually, the second is preferred as it's less verbose and cleaner to read.

EDIT (2015-03-15): SPARK-5307 introduced SerializationDebugger and Spark 1.3.0 is the first version to use it. It adds serialization path to a NotSerializableException. When a NotSerializableException is encountered, the debugger visits the object graph to find the path towards the object that cannot be serialized, and constructs information to help user to find the object.

In OP's case, this is what gets printed to stdout:

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)

这篇关于只有在没有课的对象调用外闭合功能时java.io.NotSerializableException:任务不序列化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆