为什么Spark作业无法写入输出? [英] Why does Spark job fails to write output?
问题描述
我有一个Spark作业在具有10个节点的分布式Spark集群上运行.我正在HDFS上进行一些文本文件处理.作业运行良好,直到最后一步:将输出保存为文本文件.
I have a Spark job running on a distributed Spark Cluster with 10 nodes. I am doing some text file processing on HDFS. The job runs fine, until the last step: saving output as text files.
我得到以下堆栈跟踪:
15/04/07 11:32:11 INFO spark.SparkContext: Job finished: saveAsTextFile at Main.java:235, took 1.377335791 s
Exception in thread "main" java.io.IOException: Failed to rename RawLocalFileStatus{path=file:/home/ds_myuser/tmp/myapp/out/_temporary/0/task_201504071132_0016_m_000003/part-00003; isDirectory=false; length=2494; replication=1; blocksize=33554432; modification_time=1428427931000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to file:/home/ds_myuser/tmp/myapp/out/part-00003
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
at org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:792)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1162)
at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:440)
at org.apache.spark.api.java.JavaPairRDD.saveAsTextFile(JavaPairRDD.scala:45)
at com.somecompany.analysis.myapp.control.Main.calculateRow(Main.java:235)
at com.somecompany.analysis.myapp.control.Main.calculatemyapp(Main.java:127)
at com.somecompany.analysis.myapp.control.Main.main(Main.java:103)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
此外,这就是我要保存到Java
代码中的文件的方式:
Also, this is how I am saving to file in my Java
code:
result.saveAsTextFile("/home/myuser/tmp/myapp/out");
此外,有时我在输出目录中得到1个零件文件,有时没有.
Also, sometimes I get 1 part file in the output directory, sometimes none.
是因为我试图保存到本地文件系统,并且由于所有执行者都试图写入同一位置而出现竞争状况吗?但是零件文件名不同,所以我想这应该不是问题.
Is it because I am trying to save to local file system and there is race-condition because all executors are trying to write to same location? But the part file names are different, so I guess that should not be an issue.
非常感谢您的帮助.
还有另外一件事.奇怪的是,某些临时文件是由"root
"拥有的,我无法删除:
Noticed one more thing. Strangely enough, some of the temporary files are owned by "root
" which I cannot delete:
[myuser@myserver ~]$ rm -rf tmp/myapp/
rm: cannot remove `tmp/myapp/out/_temporary/0/task_201504061658_0016_m_000001/.part-00001.crc': Permission denied
rm: cannot remove `tmp/myapp/out/_temporary/0/task_201504061658_0016_m_000001/part-00001': Permission denied
根据Marius Soutier
的建议,我尝试使用coalesce
,并且也很疲倦repartition
.通过这些更改,作业成功完成,但是在输出目录中,我仅看到_SUCCESS
文件,没有part-xxxxx.另外,我正在
EDIT 2:
As suggested by Marius Soutier
, I tried using coalesce
, and also tired repartition
. With these changes the job succeeds, but in the output directory I see only _SUCCESS
file, no part-xxxxx. Also, I am doing
result.count()
就在coalesce
或repartition
之前,它会打印260,因此有一些最终输出.但是它并没有转换成零件文件.
just before coalesce
or repartition
, which prints 260, so there is some final output. But it is not getting converted to part files.
这是我写文件的代码,它在驱动程序类中:
Here is my code which writes the file and it is in the driver class:
System.out.println("Final No. of Output Lines: " + result.count());
result.coalesce(1, true).saveAsTextFile("file:///home/myuser/tmp3");
这是打印count
之后的日志:
Final No. of Output Lines: 260
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/04/09 11:30:07 INFO spark.SparkContext: Starting job: saveAsTextFile at Main.java:284
15/04/09 11:30:07 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 164 bytes
15/04/09 11:30:07 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 164 bytes
15/04/09 11:30:07 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 174 bytes
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Registering RDD 23 (coalesce at Main.java:284)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Got job 9 (saveAsTextFile at Main.java:284) with 1 output partitions (allowLocal=false)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Final stage: Stage 21(saveAsTextFile at Main.java:284)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 26)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Missing parents: List(Stage 26)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Submitting Stage 26 (MapPartitionsRDD[23] at coalesce at Main.java:284), which has no missing parents
15/04/09 11:30:07 INFO storage.MemoryStore: ensureFreeSpace(22392) called with curMem=132730821, maxMem=5556637532
15/04/09 11:30:07 INFO storage.MemoryStore: Block broadcast_17 stored as values in memory (estimated size 21.9 KB, free 5.1 GB)
15/04/09 11:30:07 INFO storage.MemoryStore: ensureFreeSpace(11900) called with curMem=132753213, maxMem=5556637532
15/04/09 11:30:07 INFO storage.MemoryStore: Block broadcast_17_piece0 stored as bytes in memory (estimated size 11.6 KB, free 5.1 GB)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode111.mydomain.com:34468 (size: 11.6 KB, free: 5.2 GB)
15/04/09 11:30:07 INFO storage.BlockManagerMaster: Updated info of block broadcast_17_piece0
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Submitting 4 missing tasks from Stage 26 (MapPartitionsRDD[23] at coalesce at Main.java:284)
15/04/09 11:30:07 INFO scheduler.TaskSchedulerImpl: Adding task set 26.0 with 4 tasks
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 26.0 (TID 36, mynode117.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 26.0 (TID 37, mynode112.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 26.0 (TID 38, mynode115.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 26.0 (TID 39, mynode119.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode115.mydomain.com:51126 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode117.mydomain.com:33052 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to sparkExecutor@mynode115.mydomain.com:34724
15/04/09 11:30:07 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to sparkExecutor@mynode117.mydomain.com:35651
15/04/09 11:30:07 INFO network.ConnectionManager: Accepted connection from [mynode112.mydomain.com/10.211.26.212:52476]
15/04/09 11:30:07 INFO network.SendingConnection: Initiating connection to [mynode112.mydomain.com/10.211.26.212:56453]
15/04/09 11:30:07 INFO network.SendingConnection: Connected to [mynode112.mydomain.com/10.211.26.212:56453], 1 messages pending
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode119.mydomain.com:39126 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode112.mydomain.com:56453 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 26.0 (TID 36) in 356 ms on mynode117.mydomain.com (1/4)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 26.0 (TID 38) in 362 ms on mynode115.mydomain.com (2/4)
15/04/09 11:30:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to sparkExecutor@mynode119.mydomain.com:42604
15/04/09 11:30:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to sparkExecutor@mynode112.mydomain.com:46239
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 26.0 (TID 37) in 796 ms on mynode112.mydomain.com (3/4)
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 26.0 (TID 39) in 829 ms on mynode119.mydomain.com (4/4)
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Stage 26 (coalesce at Main.java:284) finished in 0.835 s
15/04/09 11:30:08 INFO scheduler.DAGScheduler: looking for newly runnable stages
15/04/09 11:30:08 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 26.0, whose tasks have all completed, from pool
15/04/09 11:30:08 INFO scheduler.DAGScheduler: running: Set()
15/04/09 11:30:08 INFO scheduler.DAGScheduler: waiting: Set(Stage 21)
15/04/09 11:30:08 INFO scheduler.DAGScheduler: failed: Set()
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Missing parents for Stage 21: List()
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Submitting Stage 21 (MappedRDD[27] at saveAsTextFile at Main.java:284), which is now runnable
15/04/09 11:30:08 INFO storage.MemoryStore: ensureFreeSpace(53664) called with curMem=132765113, maxMem=5556637532
15/04/09 11:30:08 INFO storage.MemoryStore: Block broadcast_18 stored as values in memory (estimated size 52.4 KB, free 5.1 GB)
15/04/09 11:30:08 INFO storage.MemoryStore: ensureFreeSpace(19192) called with curMem=132818777, maxMem=5556637532
15/04/09 11:30:08 INFO storage.MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 18.7 KB, free 5.1 GB)
15/04/09 11:30:08 INFO storage.BlockManagerInfo: Added broadcast_18_piece0 in memory on mynode111.mydomain.com:34468 (size: 18.7 KB, free: 5.2 GB)
15/04/09 11:30:08 INFO storage.BlockManagerMaster: Updated info of block broadcast_18_piece0
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 21 (MappedRDD[27] at saveAsTextFile at Main.java:284)
15/04/09 11:30:08 INFO scheduler.TaskSchedulerImpl: Adding task set 21.0 with 1 tasks
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 21.0 (TID 40, mynode112.mydomain.com, ANY, 1353 bytes)
15/04/09 11:30:08 INFO storage.BlockManagerInfo: Added broadcast_18_piece0 in memory on mynode112.mydomain.com:56453 (size: 18.7 KB, free: 2.1 GB)
15/04/09 11:30:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 4 to sparkExecutor@mynode112.mydomain.com:46239
15/04/09 11:30:08 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 4 is 199 bytes
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 21.0 (TID 40) in 441 ms on mynode112.mydomain.com (1/1)
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Stage 21 (saveAsTextFile at Main.java:284) finished in 0.447 s
15/04/09 11:30:08 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 21.0, whose tasks have all completed, from pool
15/04/09 11:30:08 INFO spark.SparkContext: Job finished: saveAsTextFile at Main.java:284, took 1.381897276 s
[myuser@mynode111 ~]$ ls tmp3/
_SUCCESS
[myuser@mynode111 ~]$
一个旁注,我正在HDFS上进行处理,并希望在我启动应用程序的本地FS上saveAsTextFile()
之后的最终输出文件.我希望Spark不在其他地方(某些节点的本地FS)编写它.
A side note, I am doing the processing on HDFS and expecting the final output file after saveAsTextFile()
on the local FS from where I am launching the app. I hope Spark is not writing it somewhere else (local FS of some node).
快速更新:
我尝试写入HDFS,而不是本地FS,并且工作正常:
I tried to write to HDFS, instead of Local FS and it works fine:
result.coalesce(1, true).saveAsTextFile("hdfs://mynode20.mydomain.com:8020/user/myuser/tmp");
输出:
[myuser@mynode111 ~]$ hadoop fs -ls /user/myuser/tmp
Found 2 items
-rw-r--r-- 3 myuser myuser 0 2015-04-09 11:53 /user/myuser/tmp/_SUCCESS
-rw-r--r-- 3 myuser myuser 12470 2015-04-09 11:53 /user/myuser/tmp/part-00000
[myuser@mynode111 ~]$
推荐答案
我遇到了同样的问题,事实证明我的Spark工作程序以 root用户的身份运行,而我的工作以的身份运行,因此在调用saveAsTextFile
时,Spark worker首先以 root 用户的身份将数据保存到磁盘上的临时位置,然后以不同的身份运行的Spark作业用户,尝试将 root 拥有的临时数据移动到最终位置,将出现权限问题.
I had same issue, it turned out that my Spark worker was running as root user and my job was running as another user, so when calling saveAsTextFile
, Spark worker first save the data to a temporary location on disk as root user, then the Spark job, which was running as different user, tries to move the temporary data owned by root to a final location, will have a permission issue.
这篇关于为什么Spark作业无法写入输出?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!