执行程序失败后,Spark无法在HDFS中找到检查点数据 [英] Spark not able to find checkpointed data in HDFS after executor fails

查看:76
本文介绍了执行程序失败后,Spark无法在HDFS中找到检查点数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在从Kafka获取数据,如下所示:

I am sreaming data from Kafka as below:

final JavaPairDStream<String, Row> transformedMessages = 


    rtStream
                    .mapToPair(record -> new Tuple2<String, GenericDataModel>(record.key(), record.value()))                
                    .mapWithState(StateSpec.function(updateDataFunc).numPartitions(32)).stateSnapshots()                        
                    .foreachRDD(rdd -> {
                    --logic goes here
                    }); 

我有四个工作线程,并且为此应用程序有多个执行程序,并且我正在尝试检查Spark的容错性.

I have four workers threads, and multiple executors for this application, and i am trying to check fault tolerance of Spark.

由于我们使用的是mapWithState,因此spark将数据指向HDFS的检查点,因此,如果有任何执行者/工作人员崩溃,我们应该能够恢复丢失的数据(丢失的执行者中丢失的数据),并继续使用剩余的执行者/工作人员

Since we are using mapWithState, spark is checkpointing data to HDFS, so if any executor/worker goes down , we should be able to recover the lost data (data lost in the dead executor), and continue with leftover executors/workers.

因此,我杀死了一个工作节点,以查看应用程序是否仍能平稳运行,但是我却得到了HDFS中FileNotFound的异常,如下所示:

So i kill one of the workers nodes to see if the application still runs smoothly, but instead i get an exception of FileNotFound in HDFS as below:

这有点奇怪,因为HDFS有时会生成Spark检查点数据,为什么它找不到它.显然,HDFS不会删除任何数据,所以为什么会出现此异常.

This is a bit odd, as Spark checkpointed data at sometime in HDFS, why is it not able to find it. Obviously HDFS is not deleting any data, so why this exception.

或者我在这里想念什么吗?

Or am i missing something here?

[ERROR] 2018-08-21 13:07:24,067 org.apache.spark.streaming.scheduler.JobScheduler logError - Error running job streaming job 1534871220000 ms.2
                org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000
                java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000
                        at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
                at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
                at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
                at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
                at org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
                at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:273)
                at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:273)
                at scala.Option.map(Option.scala:146)
                at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:273)
                at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1615)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1626)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1625)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1625)
                at scala.collection.immutable.List.foreach(List.scala:381)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1625)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1623)

进一步更新:我发现Spark试图在HDFS中找到的RDD已被"ReliableRDDCheckpointData"过程删除,并且为检查点数据创建了新的RDD.DAG某种程度上指向了这个旧的RDD.如果没有对此数据的引用,则不应删除该数据.

Further Update: I found that the RDD that Spark is trying to find in HDFS is already deleted by "ReliableRDDCheckpointData" process and it created a new RDD for the checkpoint data. DAG is pointing to this old RDD somehow. Had there been any reference to this data, it shouldn't have been deleted.

推荐答案

请考虑在Spark流上进行以下转换:

Consider this pipeline of transformation on a Spark stream:

rtStream
                    .mapToPair(record -> new Tuple2<String, GenericDataModel>(record.key(), record.value()))                
                    .mapWithState(StateSpec.function(updateDataFunc).numPartitions(32)).stateSnapshots()                        
                    .foreachRDD(rdd -> {
                      if(counter ==1){
                       --convert RDD to Dataset, and register it as a SQL table names "InitialDataTable"
                      } else
                       --convert RDD to Dataset, and register it as a SQL table names "ActualDataTable"


                    }); 

mapWithState与每批处理后的状态数据自动检查点相关联,因此上述"forEachRdd"块中的每个"rdd"都被检查点,并且在检查点时,它会覆盖先前的检查点(因为显然最新状态需要保留在其中)检查点)

mapWithState is associated with automatic checkpointing of state data after every batch, so each "rdd" in the above "forEachRdd" block is checkpointed , and while checkpointing, it overwrites the previous checkpoint (because obviously the latest state needs to stay in the checkpoint)

但是可以说如果用户仍在使用rdd数字1,因为在我的情况下,我将第一个rdd注册为其他表,而将其他rdd注册为其他表,则不应将其覆盖.(与java中的情况相同,如果某些对象引用了对象引用,则该对象将不符合垃圾回收的条件)

but lets say if the user is still using the rdd number 1, as in my case i am registering the very 1st rdd as a different table, and every other rdd as a different table, then it shouldnot be overwritten. (its same in java, if something is referring to a object reference , that object will not be eligible for garbage collection)

现在,当我尝试访问表"InitialDataTable"时,显然用于创建该表的"rdd"不再在内存中,因此它将进入HDFS从检查点恢复该表,并且找不到它在那里也存在,因为它被下一个rdd覆盖,并且spark应用程序停止引用原因.

Now, when i try to access the table "InitialDataTable", obviously the "rdd" used to create this table is no more in memory, so it will go to HDFS to recover that from the checkpoint, and it will not find it there as well because it was overwritten by the very next rdd, and the spark application stops citing the reason.

"org.apache.spark.SparkException:由于阶段失败而导致作业中止:任务创建失败:java.io.FileNotFoundException:文件不存在:hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000"

"org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000"

因此,要解决此问题,我必须明确检查第一个rdd.

So to resolve this issue, i had to checkpoint the very first rdd explicitly.

这篇关于执行程序失败后,Spark无法在HDFS中找到检查点数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆