接收 TimeoutException: Futures timed after [n seconds] when working with Spark 的可能原因是什么 [英] What are possible reasons for receiving TimeoutException: Futures timed out after [n seconds] when working with Spark

查看:31
本文介绍了接收 TimeoutException: Futures timed after [n seconds] when working with Spark 的可能原因是什么的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在处理 Spark SQL 程序,但收到以下异常:

I'm working on a Spark SQL program and I'm receiving the following exception:

16/11/07 15:58:25 ERROR yarn.ApplicationMaster: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:190)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)
    at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.Union.doExecute(basicOperators.scala:144)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryColumnarTableScan.scala:129)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryColumnarTableScan.scala:118)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryColumnarTableScan.scala:41)
    at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:93)
    at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:60)
    at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:84)
    at org.apache.spark.sql.DataFrame.persist(DataFrame.scala:1581)
    at org.apache.spark.sql.DataFrame.cache(DataFrame.scala:1590)
    at com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)
    at com.somecompany.ml.modeling.NewModel.generateArtifacts(FlowForNewModel.scala:32)
    at com.somecompany.ml.modeling.Flow$class.run(Flow.scala:52)
    at com.somecompany.ml.modeling.lowForNewModel.run(FlowForNewModel.scala:15)
    at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54)
    at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54)
    at scala.Option.getOrElse(Option.scala:121)
    at com.somecompany.ml.Main$.main(Main.scala:46)
    at com.somecompany.ml.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
16/11/07 15:58:25 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds])

我从堆栈跟踪中识别出的代码的最后一部分是 com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56) 这让我看到这一行:<代码>profilesDF.cache()在缓存之前,我在 2 个数据帧之间执行联合.我已经看到关于在加入之前保留两个数据帧的答案 here 我仍然需要缓存联合数据帧,因为我在我的几个转换中使用它

The last part of my code that I recognize from the stack trace is com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56) which gets me to this line: profilesDF.cache() Before the caching I perform a union between 2 dataframes. I've seen an answer about persisting both the dataframes before the join here I still need to cache the unioned dataframe since I'm using it in several of my transformations

我想知道什么可能导致抛出这个异常?搜索它让我找到了一个处理 rpc 超时异常或一些不是我的问题的安全问题的链接如果您对如何解决它也有任何想法,我显然会很感激,但即使只是了解问题也会帮助我解决它

And I was wondering what may cause this exception to be thrown? Searching for it got me to a link dealing with rpc timeout exception or some security issues which is not my problem If you also have any idea on how to solve it I'd obviously appreciate it but even just understanding the problem will help me solve it

提前致谢

推荐答案

问题:我想知道什么可能导致抛出此异常?

Question : I was wondering what may cause this exception to be thrown?

答案:

spark.sql.broadcastTimeout 300 超时以秒为单位的广播广播加入的等待时间

spark.sql.broadcastTimeout 300 Timeout in seconds for the broadcast wait time in broadcast joins

spark.network.timeout 120s 所有网络交互的默认超时.. spark.network.timeout (spark.rpc.askTimeout), spark.sql.broadcastTimeout,spark.kryoserializer.buffer.max(如果你使用 kryo序列化)等都使用比默认值更大的值进行调整为了处理复杂的查询.您可以从这些值开始,然后根据您的 SQL 工作负载进行相应调整.

spark.network.timeout 120s Default timeout for all network interactions.. spark.network.timeout (spark.rpc.askTimeout), spark.sql.broadcastTimeout, spark.kryoserializer.buffer.max(if you are using kryo serialization), etc. are tuned with larger-than-default values in order to handle complex queries. You can start with these values and adjust accordingly to your SQL workloads.

注意:Doc 说

以下选项(请参阅 spark.sql. 属性)也可用于调整查询执行的性能.随着更多优化的自动执行,这些选项可能会在未来版本中被弃用.*

The following options(see spark.sql. properties) can also be used to tune the performance of query execution. It is possible that these options will be deprecated in future release as more optimizations are performed automatically.*

另外,为了更好的理解你可以看到 BroadCastHashJoin 其中 execute 方法是上述堆栈跟踪的触发点.

Also,for your better understanding you can see BroadCastHashJoin where execute method is trigger point for the above stack trace.

protected override def doExecute(): RDD[Row] = {
    val broadcastRelation = Await.result(broadcastFuture, timeout)

    streamedPlan.execute().mapPartitions { streamedIter =>
      hashJoin(streamedIter, broadcastRelation.value)
    }
  }

这篇关于接收 TimeoutException: Futures timed after [n seconds] when working with Spark 的可能原因是什么的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆