任务不可序列化:java.io.NotSerializableException 仅在类而非对象上调用闭包外的函数时 [英] Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

查看:25
本文介绍了任务不可序列化:java.io.NotSerializableException 仅在类而非对象上调用闭包外的函数时的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在闭包外调用函数时出现奇怪的行为:

Getting strange behavior when calling function outside of a closure:

  • 当函数在一个对象中时,一切正常
  • 当函数在一个类中时:

任务不可序列化:java.io.NotSerializableException:测试

Task not serializable: java.io.NotSerializableException: testing

问题是我需要在类而不是对象中编写代码.知道为什么会这样吗?Scala 对象是否序列化(默认?)?

The problem is I need my code in a class and not an object. Any idea why this is happening? Is a Scala object serialized (default?)?

这是一个工作代码示例:

This is a working code example:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

这是非工作示例:

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}

推荐答案

RDD 扩展了 Serialisable 接口,所以这不是导致您的任务失败的原因.现在这并不意味着您可以使用 Spark 序列化 RDD 并避免 NotSerializableException

RDDs extend the Serialisable interface, so this is not what's causing your task to fail. Now this doesn't mean that you can serialise an RDD with Spark and avoid NotSerializableException

Spark 是分布式计算引擎,其主要抽象是弹性分布式数据集 (RDD),可以将其视为分布式集合.基本上,RDD 的元素跨集群的节点进行分区,但 Spark 将这一点从用户那里抽象出来,让用户与 RDD(集合)进行交互,就好像它是本地的一样.

Spark is a distributed computing engine and its main abstraction is a resilient distributed dataset (RDD), which can be viewed as a distributed collection. Basically, RDD's elements are partitioned across the nodes of the cluster, but Spark abstracts this away from the user, letting the user interact with the RDD (collection) as if it were a local one.

不要涉及太多细节,而是当您在 RDD 上运行不同的转换时(mapflatMapfilter 和其他),你的转换代码(闭包)是:

Not to get into too many details, but when you run different transformations on a RDD (map, flatMap, filter and others), your transformation code (closure) is:

  1. 在驱动程序节点上序列化,
  2. 发送到集群中的适当节点,
  3. 反序列化,
  4. 最后在节点上执行

您当然可以在本地运行此程序(如您的示例中所示),但所有这些阶段(除了通过网络运输)仍然会发生.[这让您甚至可以在部署到生产环境之前捕获任何错误]

You can of course run this locally (as in your example), but all those phases (apart from shipping over network) still occur. [This lets you catch any bugs even before deploying to production]

在第二种情况下发生的情况是,您正在从 map 函数内部调用在 testing 类中定义的方法.Spark 看到了这一点,并且由于方法无法自行序列化,Spark 尝试序列化整个 testing 类,以便代码在另一个 JVM 中执行时仍然可以工作.你有两种可能:

What happens in your second case is that you are calling a method, defined in class testing from inside the map function. Spark sees that and since methods cannot be serialized on their own, Spark tries to serialize the whole testing class, so that the code will still work when executed in another JVM. You have two possibilities:

要么让类测试序列化,这样整个类就可以被 Spark 序列化:

Either you make class testing serializable, so the whole class can be serialized by Spark:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

或者你用 someFunc 函数代替方法(函数是 Scala 中的对象),这样 Spark 就可以序列化它:

or you make someFunc function instead of a method (functions are objects in Scala), so that Spark will be able to serialize it:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

您可能会对类序列化的类似但不相同的问题感兴趣,您可以阅读它在 2013 年 Spark 峰会上的演示.

Similar, but not the same problem with class serialization can be of interest to you and you can read on it in this Spark Summit 2013 presentation.

附带说明,您可以将 rddList.map(someFunc(_)) 重写为 rddList.map(someFunc),它们完全相同.通常,第二个是首选,因为它不那么冗长,阅读起来更清晰.

As a side note, you can rewrite rddList.map(someFunc(_)) to rddList.map(someFunc), they are exactly the same. Usually, the second is preferred as it's less verbose and cleaner to read.

编辑 (2015-03-15):SPARK-5307 引入 SerializationDebugger 和 Spark 1.3.0 是第一个使用它的版本.它将序列化路径添加到 NotSerializableException.当遇到 NotSerializableException 时,调试器会访问对象图来寻找无法序列化的对象的路径,并构造信息帮助用户找到该对象.

EDIT (2015-03-15): SPARK-5307 introduced SerializationDebugger and Spark 1.3.0 is the first version to use it. It adds serialization path to a NotSerializableException. When a NotSerializableException is encountered, the debugger visits the object graph to find the path towards the object that cannot be serialized, and constructs information to help user to find the object.

在 OP 的情况下,这就是打印到标准输出的内容:

In OP's case, this is what gets printed to stdout:

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)

这篇关于任务不可序列化:java.io.NotSerializableException 仅在类而非对象上调用闭包外的函数时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆