Spark在使用java.io.IOException的大型随机作业上失败:关闭了Filesystem [英] Spark fails on big shuffle jobs with java.io.IOException: Filesystem closed
问题描述
.distinct
这样的操作。 问题是,如何诊断什么是错误的,理想情况下,我该如何解决它?
鉴于很多这些操作都是monoidal,我一直在通过将数据分解为比如说10块,在每个块上运行应用程序,然后在所有结果输出上运行应用程序。换句话说 - meta-map-reduce。
14/06/04 12:56:09错误client.AppClient $ ClientActor :Master删除了我们的应用程序:失败;停止客户端
14/06/04 12:56:09 WARN cluster.SparkDeploySchedulerBackend:与Spark群集断开连接!等待重新连接...
14/06/04 12:56:09 WARN scheduler.TaskSetManager:丢失是由于java.io.IOException
java.io.IOException:文件系统已关闭
在org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779)
at org.apache .hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.io.compress.DecompressorStream .getCompressedData(DecompressorStream.java:159)
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:143)
at org.apache.hadoop.io.compress.DecompressorStream .read(DecompressorStream.java:85)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180 )
在org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
at org.apache.spark.rdd.HadoopRDD $$ anon $ 1.getNext(HadoopRDD.scala:164)
at org.apache.spark.rdd.HadoopRDD $$ anon $ 1.getNext(HadoopRDD.scala:149)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
at scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:327)
at scala.collection.Iterator $ $ anon $ 11.hasNext(Iterator.scala:327)
at scala.collection.Iterator $ class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala: 1157)
at scala.collection.generic.Growable $ class。$ plus $ plus $ eq(Growable.scala:48)
at scala.collection.mutable.ListBu ffer。$ plus $ plus $ eq(ListBuffer.scala:176)
at scala.collection.mutable.ListBuffer。$ plus $ plus $ eq(ListBuffer.scala:45)
at scala.collection。 TraversableOnce $ class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce $ class.toList(TraversableOnce.scala: 257)
at scala.collection.AbstractIterator.toList(Iterator.scala:1157)
at $ line5。$ read $$ iwC $$ iwC $$ iwC $$ iwC $$ anonfun $ 2.apply( < console>:13)
at $ line5。$ read $$ iwC $$ iwC $$ iwC $$ iwC $$ anonfun $ 2.apply(< console>:13)
at org。 apache.spark.rdd.RDD $$ anonfun $ 1.apply(RDD.scala:450)
at org.apache.spark.rdd.RDD $$ anonfun $ 1.apply(RDD.scala:450)
在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReCheckCheck(RDD.scala:241)
at org。 apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.rdd.MappedRDD .compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReCheckCheck(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD .scala:232)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241 )
在org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
在org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
在org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org。 apache.spark.executor.Executor $ TaskRunner $$ anonfun $ run $ 1.apply $ mcV $ sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil $$ anon $ 1.run(SparkHadoopUtil .scala:42)
at org.apache.spark.deploy.SparkHadoopUtil $$ anon $ 1.run(SparkHadoopUtil.scala:41)$ b $ java.util.AccessController.doPrivileged(Native Method)
$ b在java x.security.auth.Subject.doAs(Subject.java:415)
位于org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
位于org.apache.spark。 deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:178)
在java.util.concurrent.ThreadPoolExecutor。 (ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
I often find spark fails with large jobs with a rather unhelpful meaningless exception. The worker logs look normal, no errors, but they get state "KILLED". This is extremely common for large shuffles, so operations like .distinct
.
The question is, how do I diagnose what's going wrong, and ideally, how do I fix it?
Given that a lot of these operations are monoidal I've been working around the problem by splitting the data into, say 10, chunks, running the app on each chunk, then running the app on all of the resulting outputs. In other words - meta-map-reduce.
14/06/04 12:56:09 ERROR client.AppClient$ClientActor: Master removed our application: FAILED; stopping client
14/06/04 12:56:09 WARN cluster.SparkDeploySchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...
14/06/04 12:56:09 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:159)
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:143)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:164)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:149)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
at scala.collection.AbstractIterator.toList(Iterator.scala:1157)
at $line5.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:13)
at $line5.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:13)
at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:450)
at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:450)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
As of September 1st 2014, this is an "open improvement" in Spark. Please see https://issues.apache.org/jira/browse/SPARK-3052. As syrza pointed out in the given link, the shutdown hooks are likely done in incorrect order when an executor failed which results in this message. I understand you will have to little more investigation to figure out the main cause of problem (i.e. why your executor failed). If it is a large shuffle, it might be an out-of-memory error which cause executor failure which then caused the Hadoop Filesystem to be closed in their shutdown hook. So, the RecordReaders in running tasks of that executor throw "java.io.IOException: Filesystem closed" exception. I guess it will be fixed in subsequent release and then you will get more helpful error message :)
这篇关于Spark在使用java.io.IOException的大型随机作业上失败:关闭了Filesystem的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!