作为点火作业提交时,Spark RDD映射中的NullPointerException [英] NullPointerException in Spark RDD map when submitted as a spark job
问题描述
18:02:55,271错误实用程序:91 - 中止任务
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.agg_doAggregateWithKeys $(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.processNext(Unknown Source )
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8 $$ anon $ 1 .hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator $$ anon $ 12.hasNext(Iterator.scala:438)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply $ mcV $ sp(WriterContainer.scala:253)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply(WriterContainer.scala: 252 )
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply(WriterContainer.scala:252)
at org.apache.spark.util.Utils $。在org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
处使用org.apache.spark.sql.execution处理后,尝试使用SafeFinallyAndFailureCallbacks(Utils.scala:1325)
。 datasources.InsertIntoHadoopFsRelationCommand $$ anonfun $ run $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand $$ anonfun $ (org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org .apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:274)
在线程式:Thread.PoolExecutor.runWorker(ThreadPoolExecutor.java:1142)$ b $在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)$ b $在java.lang.Thread.run( Thread.java:745)
据我们所知,这种情况发生在以下方法中:
def process(dataFrame:DataFrame,S3bucket:String)= {
dataFrame.map(row =>
text | label
).coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}
我们已经将它缩小到了地图函数,因为它在作为火花作业提交时工作:
def process(dataFrame:DataFrame,S3bucket:String)= {
dataFrame.coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
$ / code>
有谁知道可能会导致此问题的原因是什么?另外,我们如何解决它?我们很难过。
我认为你得到一个 NullPointerException
当工作人员尝试访问只存在于驱动程序而不是工作人员的SparkContext对象时由工作人员抛出。
coalesce()重新分区您的数据。当你只请求一个分区时,它会尝试将所有的数据压缩在一个分区中。这可能会给应用程序的内存足够大。
通常,不要只将分区缩小到1就是个好主意。
更多信息,请阅读以下内容: Spark NullPointerException with saveAsTextFile 和这。
- 如果您不确定分区是什么,我会在记忆在Spark中的OverOverhead问题。
We're trying to submit a spark job (spark 2.0, hadoop 2.7.2) but for some reason we're receiving a rather cryptic NPE in EMR. Everything runs just fine as a scala program so we're not really sure what's causing the issue. Here's the stack trace:
18:02:55,271 ERROR Utils:91 - Aborting task java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:253) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
As far as we can tell this is occurring in the following method:
def process(dataFrame: DataFrame, S3bucket: String) = {
dataFrame.map(row =>
"text|label"
).coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}
We've narrowed it down to the map function as this works when submitted as a spark job:
def process(dataFrame: DataFrame, S3bucket: String) = {
dataFrame.coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}
Does anyone have any idea what might be causing this issue? Also, how can we resolve it? We're pretty stumped.
I think that you get a NullPointerException
thrown by the worker when it tries to access a SparkContext object that's only present on the driver and not the workers.
coalesce() repartitions your data. When you you request one partition only, it will try to squeeze all the data in one partition*. That may put much pressure on the memory footpring of your application.
In general, it is a good idea not to shrink your partitions in only 1.
For more, read this: Spark NullPointerException with saveAsTextFile and this.
- In case you are not sure what a partition is, I explained it to myself in memoryOverhead issue in Spark.
这篇关于作为点火作业提交时,Spark RDD映射中的NullPointerException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!