Spark序列化异常 [英] Serialization Exception on spark

查看:73
本文介绍了Spark序列化异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Spark上遇到了一个关于序列化的非常奇怪的问题. 代码如下:

I meet a very strange problem on Spark about serialization. The code is as below:

class PLSA(val sc : SparkContext, val numOfTopics : Int) extends Serializable
{
    def infer(document: RDD[Document]): RDD[DocumentParameter] = {
      val docs = documents.map(doc => DocumentParameter(doc, numOfTopics))
      docs
    }
}

其中文档定义为:

class Document(val tokens: SparseVector[Int]) extends Serializable

,DocumentParameter是:

and DocumentParameter is:

class DocumentParameter(val document: Document, val theta: Array[Float]) extends Serializable

object DocumentParameter extends Serializable
{
  def apply(document: Document, numOfTopics: Int) = new DocumentParameter(document, 
    Array.ofDim[Float](numOfTopics))
}

SparseVector是breeze.linalg.SparseVector中的可序列化类.

SparseVectoris a serializable class in breeze.linalg.SparseVector.

这是一个简单的映射过程,所有类都可序列化,但是我遇到了这个异常:

This is a simple map procedure, and all the classes are serializable, but I get this exception:

org.apache.spark.SparkException: Task not serializable

但是当我删除numOfTopics参数时,即:

But when I remove the numOfTopics parameter, that is:

object DocumentParameter extends Serializable
{
  def apply(document: Document) = new DocumentParameter(document, 
    Array.ofDim[Float](10))
}

并这样称呼它:

val docs = documents.map(DocumentParameter.apply)

,看来还可以.

Int类型不是可序列化的吗?但是我确实看到一些代码是这样写的.

Is type Int not serializable? But I do see that some code is written like that.

我不确定如何解决此错误.

I am not sure how to fix this bug.

#UPDATED#:

谢谢@samthebest.我将添加有关它的更多详细信息.

Thank you @samthebest. I will add more details about it.

stack trace:
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
    at org.apache.spark.rdd.RDD.map(RDD.scala:270)
    at com.topicmodel.PLSA.infer(PLSA.scala:13)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
    at $iwC$$iwC$$iwC.<init>(<console>:39)
    at $iwC$$iwC.<init>(<console>:41)
    at $iwC.<init>(<console>:43)
    at <init>(<console>:45)
    at .<init>(<console>:49)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 46 more

由于堆栈跟踪提供了异常的一般信息,因此我将其删除.

As the stack trace gives the general information of exception, I removed it.

我在spark-shell中运行代码.

I run the code in the spark-shell.

// suppose I have get RDD[Document] for docs
val numOfTopics = 100
val plsa = new PLSA(sc, numOfTopics)
val docPara = plsa.infer(docs)

您能给我一些有关可序列化的教程或提示吗?

Could you give me some tutorials or tips on serializable?

推荐答案

匿名函数序列化其包含的类.当您使用map {doc => DocumentParameter(doc, numOfTopics)}时,它赋予该功能访问numOfTopics的唯一方法是序列化PLSA类.该类实际上无法序列化,因为(如您从堆栈跟踪中所见)它包含了不可序列化的SparkContext(如果单个群集节点可以访问上下文并可能例如创建新的类,则会发生错误的事情)映射器中的作业).

Anonymous functions serialize their containing class. When you map {doc => DocumentParameter(doc, numOfTopics)}, the only way it can give that function access to numOfTopics is to serialize the PLSA class. And that class can't actually be serialized, because (as you can see from the stacktrace) it contains the SparkContext which isn't serializable (Bad Things would happen if individual cluster nodes had access to the context and could e.g. create new jobs from within a mapper).

通常,请尽量避免将SparkContext存储在您的类中(或者至少要确保非常清楚哪种类包含SparkContext,哪些不包含SparkContext);最好将它作为(可能是implicit)参数传递给需要它的各个方法.或者,将函数{doc => DocumentParameter(doc, numOfTopics)}移到与PLSA不同的类中,该类可以被序列化.

In general, try to avoid storing the SparkContext in your classes (edit: or at least, make sure it's very clear what kind of classes contain the SparkContext and what kind don't); it's better to pass it as a (possibly implicit) parameter to individual methods that need it. Alternatively, move the function {doc => DocumentParameter(doc, numOfTopics)} into a different class from PLSA, one that really can be serialized.

(正如多人建议的那样,可以将SparkContext保留在类中,但标记为@transient,这样它就不会被序列化.我不推荐这种方法;这意味着该类将进行序列化时会神奇地更改状态(丢失SparkContext),因此,当您尝试从序列化作业内部访问SparkContext时,可能会得到NPE.最好保持明确区分仅使用的类在控制"代码中(可能使用SparkContext)和序列化为在集群上运行的类(不能具有SparkContext).

(As multiple people have suggested, it's possible to keep the SparkContext in the class but marked as @transient so that it won't be serialized. I don't recommend this approach; it means the class will "magically" change state when serialized (losing the SparkContext), and so you might end up with NPEs when you try to access the SparkContext from inside a serialized job. It's better to maintain a clear distinction between classes that are only used in the "control" code (and might use the SparkContext) and classes that are serialized to run on the cluster (which must not have the SparkContext)).

这篇关于Spark序列化异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆