任务不可序列异常,同时运行Apache的火花工作 [英] Task not serializable exception while running apache spark job

查看:306
本文介绍了任务不可序列异常,同时运行Apache的火花工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下Java程序编写与Apache火花试验。

The following java program was written to experiment with apache spark.

该程序试图从各自的文件中读取的正面和负面的单词的列表,比较它的主文件,结果相应进行筛选。

The program tries to read a list of positive and negative words from a respective file, compare it to the master file and filter the results accordingly.

import java.io.Serializable;
import java.io.FileNotFoundException;
import java.io.File;
import java.util.*;
import java.util.Iterator;
import java.util.List;
import java.util.List;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;

public class SimpleApp implements Serializable{
  public static void main(String[] args) {
    String logFile = "/tmp/master.txt"; // Should be some file on your system
    String positive = "/tmp/positive.txt"; // Should be some file on your system
    String negative = "/tmp/negative.txt"; // Should be some file on your system

    JavaSparkContext sc = new JavaSparkContext("local[4]", "Twitter Analyzer", "/home/welcome/Downloads/spark-1.1.0/", new String[]{"target/scala-2.10/Simple-assembly-0.1.0.jar"});

    JavaRDD<String> positiveComments = sc.textFile(logFile).cache();

    List<String> positiveList = GetSentiments(positive);
    List<String> negativeList= GetSentiments(negative);

    final Iterator<String> iterator = positiveList.iterator();
    int i = 0;
    while (iterator.hasNext())
    {
      JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>()
      {
        public Boolean call(String s)
        {
          return s.contains(iterator.next());
        }
      });

     numAs.saveAsTextFile("/tmp/output/"+ i);
     i++;
     }

  }

public static List<String> GetSentiments(String fileName) {
  List<String> input = new ArrayList<String>();
try
{
  Scanner sc = new Scanner(new File(fileName));

  while (sc.hasNextLine()) {
      input.add(sc.nextLine());
  }
}
catch (FileNotFoundException e){
    // do stuff here..
}
  return input;
}

}

在执行火星任务被抛出下面的错误,

The Following error is thrown while executing spark job,

 Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
    at org.apache.spark.rdd.RDD.filter(RDD.scala:282)
    at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78)
    at SimpleApp.main(SimpleApp.java:37)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: java.util.ArrayList$Itr
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 12 more

任何指针?

推荐答案

当您创建一个匿名类,编译器做一些事情:

When you create an anonymous class, the compiler does some stuff:

JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>()
      {
        public Boolean call(String s)
        {
          return s.contains(iterator.next());
        }
      });

有将被重写为:

JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>()
      {
        private Iterator<...> $iterator;
        public Boolean call(String s)
        {
          return s.contains($iterator.next());
        }
      });

这就是为什么你可以有一个 NotSerializableException ,因为迭代不是序列化的。

This is why you can have a NotSerializableException because the Iterator is not serializable.

要避免这种情况,只需抽取的结果下一个前:

To avoid that, simply extract the result of next before:

String value = iterator.next();
JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>()
      {
        public Boolean call(String s)
        {
          return s.contains(value);
        }
      });

这篇关于任务不可序列异常,同时运行Apache的火花工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆