收到TimeoutException的可能原因是什么:使用Spark时,期货在[n秒]之后超时 [英] What are possible reasons for receiving TimeoutException: Futures timed out after [n seconds] when working with Spark

查看:144
本文介绍了收到TimeoutException的可能原因是什么:使用Spark时,期货在[n秒]之后超时的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发Spark SQL程序,并且收到以下异常:

I'm working on a Spark SQL program and I'm receiving the following exception:

16/11/07 15:58:25 ERROR yarn.ApplicationMaster: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:190)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)
    at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.Union.doExecute(basicOperators.scala:144)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryColumnarTableScan.scala:129)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryColumnarTableScan.scala:118)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryColumnarTableScan.scala:41)
    at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:93)
    at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:60)
    at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:84)
    at org.apache.spark.sql.DataFrame.persist(DataFrame.scala:1581)
    at org.apache.spark.sql.DataFrame.cache(DataFrame.scala:1590)
    at com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)
    at com.somecompany.ml.modeling.NewModel.generateArtifacts(FlowForNewModel.scala:32)
    at com.somecompany.ml.modeling.Flow$class.run(Flow.scala:52)
    at com.somecompany.ml.modeling.lowForNewModel.run(FlowForNewModel.scala:15)
    at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54)
    at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54)
    at scala.Option.getOrElse(Option.scala:121)
    at com.somecompany.ml.Main$.main(Main.scala:46)
    at com.somecompany.ml.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
16/11/07 15:58:25 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds])

我从堆栈跟踪中识别出的代码的最后一部分是com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56),这使我进入以下行:profilesDF.cache() 在缓存之前,我在2个数据帧之间执行了合并.我已经看到了有关在连接

The last part of my code that I recognize from the stack trace is com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56) which gets me to this line: profilesDF.cache() Before the caching I perform a union between 2 dataframes. I've seen an answer about persisting both the dataframes before the join here I still need to cache the unioned dataframe since I'm using it in several of my transformations

我想知道是什么原因导致引发此异常? 搜索它使我进入一个处理rpc超时异常或某些安全问题的链接,这不是我的问题 如果您还对解决方法有任何想法,我将不胜感激,但是即使只是了解问题也会帮助我解决问题

And I was wondering what may cause this exception to be thrown? Searching for it got me to a link dealing with rpc timeout exception or some security issues which is not my problem If you also have any idea on how to solve it I'd obviously appreciate it but even just understanding the problem will help me solve it

预先感谢

推荐答案

问题:我想知道是什么原因导致抛出此异常?

Question : I was wondering what may cause this exception to be thrown?

答案:

spark.sql.broadcastTimeout 300广播超时(以秒为单位) 广播加入的等待时间

spark.sql.broadcastTimeout 300 Timeout in seconds for the broadcast wait time in broadcast joins

spark.network.timeout 120s所有网络交互的默认超时..spark.network.timeout (spark.rpc.askTimeout)spark.sql.broadcastTimeoutspark.kryoserializer.buffer.max(如果您使用的是Kryo 序列化等),以大于默认值的方式进行调整 为了处理复杂的查询.您可以从这些值开始 根据您的SQL工作负载进行相应调整.

spark.network.timeout 120s Default timeout for all network interactions.. spark.network.timeout (spark.rpc.askTimeout), spark.sql.broadcastTimeout, spark.kryoserializer.buffer.max(if you are using kryo serialization), etc. are tuned with larger-than-default values in order to handle complex queries. You can start with these values and adjust accordingly to your SQL workloads.

注意:医生说

以下选项(请参见spark.sql.属性)也可以用于调整查询执行的性能.这些选项可能会在以后的版本中弃用,因为会自动执行更多优化.*

The following options(see spark.sql. properties) can also be used to tune the performance of query execution. It is possible that these options will be deprecated in future release as more optimizations are performed automatically.*

此外,为使您更好地理解,您可以参阅

Also,for your better understanding you can see BroadCastHashJoin where execute method is trigger point for the above stack trace.

protected override def doExecute(): RDD[Row] = {
    val broadcastRelation = Await.result(broadcastFuture, timeout)

    streamedPlan.execute().mapPartitions { streamedIter =>
      hashJoin(streamedIter, broadcastRelation.value)
    }
  }

这篇关于收到TimeoutException的可能原因是什么:使用Spark时,期货在[n秒]之后超时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆