使用前已移除Spark RDD块 [英] Spark RDD Block Removed Before Use

查看:116
本文介绍了使用前已移除Spark RDD块的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Future对RDD执行阻塞操作,如下所示:

I am using a Future to perform a blocking operation on an RDD like this:

dStreams.foreach(_.foreachRDD { rdd =>

  Future{ writeRDD(rdd) }

})

有时我会收到此错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: org.apache.spark.SparkException: Attempted to use BlockRDD[820] at actorStream at Tests.scala:149 after its blocks have been removed!

Spark似乎很难知道何时应删除此RDD.

It seems like Spark is having trouble knowing when this RDD should be deleted.

为什么会这样?解决方法是什么?

Why is this happening and what is the solution?

更新:

我认为RDD在使用之前可能已经被GC了.到目前为止,唯一可行的解​​决方案涉及设置

I think RDDs might be GC'd before they are used. The only working solution so far involves setting

conf.set("spark.streaming.unpersist", "false") 

unpersist()-手动

完整堆栈跟踪,以防出现错误:

Full stack trace in case this is a bug:

15/10/12 23:57:23 ERROR org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: org.apache.spark.SparkException: Attempted to use BlockRDD[765] at actorStream at NxCoreSparkTests.scala:168 after its blocks have been removed!
org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:251)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:251)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:250)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1394)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1405)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1404)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1402)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1402)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1405)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1404)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1402)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1402)
org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1368)
org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:829)
org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:827)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
scala.collection.AbstractTraversable.map(Traversable.scala:105)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:827)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:772)
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:757)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1463)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:836)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:772)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:757)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1463)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:927)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:927)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304)
    at vscan.NxCoreSparkDbUtil$.writeToParquetByDay(NxCoreSparkTapeReader.scala:210)
    at vscan.NxCoreSparkGoogleHDFS$$anonfun$6$$anonfun$apply$3$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(NxCoreSparkTests.scala:190)
    at vscan.NxCoreSparkGoogleHDFS$$anonfun$6$$anonfun$apply$3$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(NxCoreSparkTests.scala:188)
    at vscan.NxCoreSparkGoogleHDFS$$anonfun$6$$anonfun$apply$3$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(NxCoreSparkTests.scala:188)
    at vscan.NxCoreSparkGoogleHDFS$.retry(NxCoreSparkTests.scala:217)
    at vscan.NxCoreSparkGoogleHDFS$.retry(NxCoreSparkTests.scala:219)
    at vscan.NxCoreSparkGoogleHDFS$.retry(NxCoreSparkTests.scala:219)
    at vscan.NxCoreSparkGoogleHDFS$.retry(NxCoreSparkTests.scala:219)
    at vscan.NxCoreSparkGoogleHDFS$.retry(NxCoreSparkTests.scala:219)
    at vscan.NxCoreSparkGoogleHDFS$$anonfun$6$$anonfun$apply$3$$anonfun$1.apply$mcV$sp(NxCoreSparkTests.scala:188)
    at vscan.NxCoreSparkGoogleHDFS$$anonfun$6$$anonfun$apply$3$$anonfun$1.apply(NxCoreSparkTests.scala:185)
    at vscan.NxCoreSparkGoogleHDFS$$anonfun$6$$anonfun$apply$3$$anonfun$1.apply(NxCoreSparkTests.scala:185)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Attempted to use BlockRDD[765] at actorStream at NxCoreSparkTests.scala:168 after its blocks have been removed!
    at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
    at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56)
    at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:251)
    at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:250)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1394)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1405)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1404)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1402)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1402)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1405)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1404)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1402)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1402)
    at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1368)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:829)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:827)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:827)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:772)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:757)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1463)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/10/12 23:57:24 ERROR org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer: Job job_201510122357_0000 aborted.

推荐答案

我认为问题在于,在 writeRDD(rdd)内部的代码执行之前(因为它位于 Future中),则Apache Spark内存管理或 BlockManager 已回收rdd(或微型批处理RDD).

I think the problem is that, before your code inside writeRDD(rdd) executes ( since it is in a Future ), the rdd ( or the micro-batch RDD ) is already reclaimed by Apache Spark memory-management or BlockManager.

因此,此错误

<代码>org.apache.spark.SparkException:由于阶段失败而导致作业中止:任务创建失败:org.apache.spark.SparkException:尝试在删除其块之后在Tests.scala:149的actorStream上使用BlockRDD [820]!

您可以通过以下方法解决此问题:首先收集微批处理集合,然后将其传递给 writeRDD()函数.像这样:

You can fix this by first collecting the micro-batch collection and then passing it to writeRDD() function. Something like this:

dStreams.foreach(_.foreachRDD { rdd =>

  val coll = rdd.collect()
  Future{ writeCollection(coll) }

})

这篇关于使用前已移除Spark RDD块的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆