收到TimeoutException的可能原因是什么:使用Spark时,期货在[n秒]之后超时 [英] What are possible reasons for receiving TimeoutException: Futures timed out after [n seconds] when working with Spark
问题描述
我正在开发Spark SQL程序,并且收到以下异常:
I'm working on a Spark SQL program and I'm receiving the following exception:
16/11/07 15:58:25 ERROR yarn.ApplicationMaster: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)
at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.Union.doExecute(basicOperators.scala:144)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryColumnarTableScan.scala:129)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryColumnarTableScan.scala:118)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryColumnarTableScan.scala:41)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:93)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:60)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:84)
at org.apache.spark.sql.DataFrame.persist(DataFrame.scala:1581)
at org.apache.spark.sql.DataFrame.cache(DataFrame.scala:1590)
at com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)
at com.somecompany.ml.modeling.NewModel.generateArtifacts(FlowForNewModel.scala:32)
at com.somecompany.ml.modeling.Flow$class.run(Flow.scala:52)
at com.somecompany.ml.modeling.lowForNewModel.run(FlowForNewModel.scala:15)
at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54)
at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54)
at scala.Option.getOrElse(Option.scala:121)
at com.somecompany.ml.Main$.main(Main.scala:46)
at com.somecompany.ml.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
16/11/07 15:58:25 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds])
我从堆栈跟踪中识别出的代码的最后一部分是com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)
,这使我进入以下行:profilesDF.cache()
在缓存之前,我在2个数据帧之间执行了合并.我已经看到了有关在连接
The last part of my code that I recognize from the stack trace is com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)
which gets me to this line: profilesDF.cache()
Before the caching I perform a union between 2 dataframes. I've seen an answer about persisting both the dataframes before the join here I still need to cache the unioned dataframe since I'm using it in several of my transformations
我想知道是什么原因导致引发此异常? 搜索它使我进入一个处理rpc超时异常或某些安全问题的链接,这不是我的问题 如果您还对解决方法有任何想法,我将不胜感激,但是即使只是了解问题也会帮助我解决问题
And I was wondering what may cause this exception to be thrown? Searching for it got me to a link dealing with rpc timeout exception or some security issues which is not my problem If you also have any idea on how to solve it I'd obviously appreciate it but even just understanding the problem will help me solve it
预先感谢
推荐答案
问题:我想知道是什么原因导致抛出此异常?
Question : I was wondering what may cause this exception to be thrown?
答案:
spark.sql.broadcastTimeout
300 Timeout in seconds for the broadcast wait time in broadcast joins
spark.network.timeout
120s所有网络交互的默认超时..spark.network.timeout (spark.rpc.askTimeout)
,spark.sql.broadcastTimeout
,
spark.kryoserializer.buffer.max
(如果您使用的是Kryo
序列化等),以大于默认值的方式进行调整
为了处理复杂的查询.您可以从这些值开始
根据您的SQL工作负载进行相应调整.
spark.network.timeout
120s Default timeout for all network interactions.. spark.network.timeout (spark.rpc.askTimeout)
, spark.sql.broadcastTimeout
,
spark.kryoserializer.buffer.max
(if you are using kryo
serialization), etc. are tuned with larger-than-default values in
order to handle complex queries. You can start with these values and
adjust accordingly to your SQL workloads.
注意:医生说
以下选项(请参见spark.sql.属性)也可以用于调整查询执行的性能.这些选项可能会在以后的版本中弃用,因为会自动执行更多优化.*
The following options(see spark.sql. properties) can also be used to tune the performance of query execution. It is possible that these options will be deprecated in future release as more optimizations are performed automatically.*
Also,for your better understanding you can see BroadCastHashJoin where execute method is trigger point for the above stack trace.
protected override def doExecute(): RDD[Row] = {
val broadcastRelation = Await.result(broadcastFuture, timeout)
streamedPlan.execute().mapPartitions { streamedIter =>
hashJoin(streamedIter, broadcastRelation.value)
}
}
这篇关于收到TimeoutException的可能原因是什么:使用Spark时,期货在[n秒]之后超时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!