Spark:Java中的forEach循环中的任务不可序列化异常 [英] Spark: Task not serializable Exception in forEach loop in Java

查看:74
本文介绍了Spark:Java中的forEach循环中的任务不可序列化异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图遍历JavaPairRDD并使用JavaPairRDD的键和值执行一些计算.然后将每个JavaPair的结果输出到 processedData 列表中.

I'm trying to iterate over JavaPairRDD and perform some calculations with keys and values of JavaPairRDD. Then output result for each JavaPair into processedData list.

我已经尝试过的方法: 使变量,我在lambda函数内部使用的静态. 我从lambda foreach循环调用的make方法是static. 添加了可序列化的工具

What I already tried: make variables, that I use inside of lambda function static. make methods, that I call from lambda foreach loop static. added implements Serializable

这是我的代码:

    	List<String> processedData = new ArrayList<>();
      
      JavaPairRDD<WebLabGroupObject, Iterable<WebLabPurchasesDataObject>> groupedByWebLabData.foreach(data ->{
    	
     JavaRDD<WebLabPurchasesDataObject> oneGroupOfData = convertIterableToJavaRdd(data._2());
          
     double opsForOneGroup = getOpsForGroup(oneGroupOfData);
     double unitsForOneGroup = getUnitsForGroup(oneGroupOfData);
    			
     String combinedOutputForOneGroup =  data._1().getProductGroup() + "," + opsForOneGroup + "," + unitsForOneGroup;
    					
     processedData.add(combinedOutputForOneGroup);
   });



 private JavaRDD<WebLabPurchasesDataObject> convertIterableToJavaRdd(Iterable<WebLabPurchasesDataObject> groupedElements)
   {
      List<WebLabPurchasesDataObject> list = new ArrayList<>();				 
    	groupedElements.forEach(el -> list.add(el));
      return this.context.parallelize(list);
   }

这是例外本身:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:797)
at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:312)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46)
at com.amazon.videoads.emr.spark.WebLabDataAnalyzer.processWebLabData(WebLabDataAnalyzer.java:121)
at com.amazon.videoads.emr.spark.WebLabMetricsApplication.main(WebLabMetricsApplication.java:110)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala).Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext . Serialization stack:
- object not serializable (class: org.apache.spark.api.java.JavaSparkContext, value: org.apache.spark.api.java.JavaSparkContext@395e9596)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 2)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class com.amazon.videoads.emr.spark.WebLabDataAnalyzer$$Lambda$14/1536342848, com.amazon.videoads.emr.spark.WebLabDataAnalyzer$$Lambda$14/1536342848@5acc8c7c)
- field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1, name: f$14, type: interface org.apache.spark.api.java.function.VoidFunction)
- object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 16 more

推荐答案

TL; DR :您正试图在 groupedByWebLabData RDD:您不能这样做,因为JavaSparkContext无法序列化.

TL;DR: you're trying to use a JavaSparkContext inside your groupedByWebLabData RDD: you can't do that since JavaSparkContext is not serializable.

此处的堆栈跟踪非常有用:

The stacktrace is quite helpful here:

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala).Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext . Serialization stack:

这意味着

  • 您正在尝试序列化无法序列化的内容
  • 这是 JavaSparkContext

这是由这两行引起的:

JavaPairRDD<WebLabGroupObject, Iterable<WebLabPurchasesDataObject>> groupedByWebLabData.foreach(data ->{
 JavaRDD<WebLabPurchasesDataObject> oneGroupOfData = convertIterableToJavaRdd(data._2());

因为

convertIterableToJavaRdd

由您的RDD的每个元素调用的

使用

which is called by each element of your RDD, uses

this.context.parallelize(list)

即它使用 JavaSparkContext :您正在尝试在执行程序上使用JavaSparkContext(使您的 groupedByWebLabData RDD保留的数据).好吧,因为JavaSparkContext无法序列化,所以您不能这样做.

i.e. it uses a JavaSparkContext: you're trying to use a JavaSparkContext on your executors (where the data making your groupedByWebLabData RDD lives). Well you can't do that since a JavaSparkContext is not serializable.

您可能正在通过 UDF 完成此操作,并且可以收集结果(如果它不太大).

Here what you're doing can probably be done via an UDF and you can collect the result (if it's not too big).

这篇关于Spark:Java中的forEach循环中的任务不可序列化异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆