Spark在使用java.io.IOException的大型随机作业上失败:关闭了Filesystem [英] Spark fails on big shuffle jobs with java.io.IOException: Filesystem closed

查看:108
本文介绍了Spark在使用java.io.IOException的大型随机作业上失败:关闭了Filesystem的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我经常发现火花失败与大型工作,而无用的毫无意义的例外。工作人员日志看起来很正常,没有错误,但他们得到状态KILLED。这对于大型洗牌非常常见,所以像 .distinct 这样的操作。



问题是,如何诊断什么是错误的,理想情况下,我该如何解决它?



鉴于很多这些操作都是monoidal,我一直在通过将数据分解为比如说10块,在每个块上运行应用程序,然后在所有结果输出上运行应用程序。换句话说 - meta-map-reduce。

  14/06/04 12:56:09错误client.AppClient $ ClientActor :Master删除了我们的应用程序:失败;停止客户端
14/06/04 12:56:09 WARN cluster.SparkDeploySchedulerBackend:与Spark群集断开连接!等待重新连接...
14/06/04 12:56:09 WARN scheduler.TaskSetManager:丢失是由于java.io.IOException
java.io.IOException:文件系统已关闭
在org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779)
at org.apache .hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.io.compress.DecompressorStream .getCompressedData(DecompressorStream.java:159)
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:143)
at org.apache.hadoop.io.compress.DecompressorStream .read(DecompressorStream.java:85)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180 )
在org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
at org.apache.spark.rdd.HadoopRDD $$ anon $ 1.getNext(HadoopRDD.scala:164)
at org.apache.spark.rdd.HadoopRDD $$ anon $ 1.getNext(HadoopRDD.scala:149)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
at scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:327)
at scala.collection.Iterator $ $ anon $ 11.hasNext(Iterator.scala:327)
at scala.collection.Iterator $ class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala: 1157)
at scala.collection.generic.Growable $ class。$ plus $ plus $ eq(Growable.scala:48)
at scala.collection.mutable.ListBu ffer。$ plus $ plus $ eq(ListBuffer.scala:176)
at scala.collection.mutable.ListBuffer。$ plus $ plus $ eq(ListBuffer.scala:45)
at scala.collection。 TraversableOnce $ class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce $ class.toList(TraversableOnce.scala: 257)
at scala.collection.AbstractIterator.toList(Iterator.scala:1157)
at $ line5。$ read $$ iwC $$ iwC $$ iwC $$ iwC $$ anonfun $ 2.apply( < console>:13)
at $ line5。$ read $$ iwC $$ iwC $$ iwC $$ iwC $$ anonfun $ 2.apply(< console>:13)
at org。 apache.spark.rdd.RDD $$ anonfun $ 1.apply(RDD.scala:450)
at org.apache.spark.rdd.RDD $$ anonfun $ 1.apply(RDD.scala:450)
在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReCheckCheck(RDD.scala:241)
at org。 apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.rdd.MappedRDD .compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReCheckCheck(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD .scala:232)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241 )
在org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
在org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
在org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org。 apache.spark.executor.Executor $ TaskRunner $$ anonfun $ run $ 1.apply $ mcV $ sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil $$ anon $ 1.run(SparkHadoopUtil .scala:42)
at org.apache.spark.deploy.SparkHadoopUtil $$ anon $ 1.run(SparkHadoopUtil.scala:41)$ b $ java.util.AccessController.doPrivileged(Native Method)
$ b在java x.security.auth.Subject.doAs(Subject.java:415)
位于org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
位于org.apache.spark。 deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:178)
在java.util.concurrent.ThreadPoolExecutor。 (ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)


解决方案

截至2014年9月1日,改进在星火。请参阅 https://issues.apache.org/jira/browse/SPARK-3052。正如syrza在给定链接中指出的那样,当执行程序失败导致此消息时,关闭挂接可能以错误的顺序完成。我知道你需要多一点调查才能找出问题的主要原因(即你的执行者失败的原因)。如果它是一个大的随机播放,它可能是一个内存不足的错误,导致执行程序失败,然后导致Hadoop文件系统在关闭挂钩中关闭。所以,RecordReaders在执行该任务的任务时抛出java.io.IOException:Filesystem closed异常。我想它会在后续版本中被修复,然后你会得到更多有用的错误信息:)


I often find spark fails with large jobs with a rather unhelpful meaningless exception. The worker logs look normal, no errors, but they get state "KILLED". This is extremely common for large shuffles, so operations like .distinct.

The question is, how do I diagnose what's going wrong, and ideally, how do I fix it?

Given that a lot of these operations are monoidal I've been working around the problem by splitting the data into, say 10, chunks, running the app on each chunk, then running the app on all of the resulting outputs. In other words - meta-map-reduce.

14/06/04 12:56:09 ERROR client.AppClient$ClientActor: Master removed our application: FAILED; stopping client
14/06/04 12:56:09 WARN cluster.SparkDeploySchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...
14/06/04 12:56:09 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException
java.io.IOException: Filesystem closed
    at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703)
    at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779)
    at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:159)
    at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:143)
    at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
    at java.io.InputStream.read(InputStream.java:101)
    at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
    at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
    at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
    at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:164)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:149)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
    at scala.collection.AbstractIterator.toList(Iterator.scala:1157)
    at $line5.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:13)
    at $line5.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:13)
    at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:450)
    at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:450)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

解决方案

As of September 1st 2014, this is an "open improvement" in Spark. Please see https://issues.apache.org/jira/browse/SPARK-3052. As syrza pointed out in the given link, the shutdown hooks are likely done in incorrect order when an executor failed which results in this message. I understand you will have to little more investigation to figure out the main cause of problem (i.e. why your executor failed). If it is a large shuffle, it might be an out-of-memory error which cause executor failure which then caused the Hadoop Filesystem to be closed in their shutdown hook. So, the RecordReaders in running tasks of that executor throw "java.io.IOException: Filesystem closed" exception. I guess it will be fixed in subsequent release and then you will get more helpful error message :)

这篇关于Spark在使用java.io.IOException的大型随机作业上失败:关闭了Filesystem的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆