Spark:从2.1.0升级到2.2.1时,数据框动作确实很慢 [英] Spark: Dataframe action really slow when upgraded from 2.1.0 to 2.2.1

查看:108
本文介绍了Spark:从2.1.0升级到2.2.1时,数据框动作确实很慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我刚刚将spark 2.1.0升级到spark 2.2.1.有没有人看到过 dataframe.filter(…).collect()?..特别是以前使用 filter 进行 collect 操作的极端缓慢的行为. dataframe.collect 似乎可以运行.但是, dataframe.filter(…).collect()会花很长时间.它仅包含2条记录.并在单元测试中.当我回到spark 2.1.0时,它恢复到正常速度

我查看了线程转储,找不到明显的原因.我已尽力确保我正在使用的所有库也都在使用Spark 2.2.1.任何建议将不胜感激.

似乎卡在了这个堆栈跟踪中

  scala.collection.mutable.FlatHashTable $ class.addEntry(FlatHashTable.scala:151)scala.collection.mutable.HashSet.addEntry(HashSet.scala:40)scala.collection.mutable.FlatHashTable $ class.addElem(FlatHashTable.scala:142)scala.collection.mutable.HashSet.addElem(HashSet.scala:40)scala.collection.mutable.HashSet.$ plus $ eq(HashSet.scala:59)scala.collection.mutable.HashSet.$ plus $ eq(HashSet.scala:40)scala.collection.generic.Growable $$ anonfun $$ plus $ plus $ eq $ 1.apply(Growable.scala:59)scala.collection.generic.Growable $$ anonfun $$ plus $ plus $ eq $ 1.apply(Growable.scala:59)scala.collection.mutable.HashSet.foreach(HashSet.scala:78)scala.collection.generic.Growable $ class.$ plus $ plus $ eq(Growable.scala:59)scala.collection.mutable.AbstractSet.$ plus $ plus $ eq(Set.scala:46)scala.collection.mutable.HashSet.clone(HashSet.scala:83)scala.collection.mutable.HashSet.clone(HashSet.scala:40)org.apache.spark.sql.catalyst.expressions.ExpressionSet.$ plus(ExpressionSet.scala:65)org.apache.spark.sql.catalyst.expressions.ExpressionSet.$ plus(ExpressionSet.scala:50)scala.collection.SetLike $$ anonfun $$ plus $ plus $ 1.apply(SetLike.scala:141)scala.collection.SetLike $$ anonfun $$ plus $ plus $ 1.apply(SetLike.scala:141)scala.collection.TraversableOnce $$ anonfun $ foldLeft $ 1.apply(TraversableOnce.scala:157)scala.collection.TraversableOnce $$ anonfun $ foldLeft $ 1.apply(TraversableOnce.scala:157)scala.collection.immutable.HashSet $ HashSet1.foreach(HashSet.scala:316)scala.collection.immutable.HashSet $ HashTrieSet.foreach(HashSet.scala:972)scala.collection.immutable.HashSet $ HashTrieSet.foreach(HashSet.scala:972)scala.collection.immutable.HashSet $ HashTrieSet.foreach(HashSet.scala:972)scala.collection.immutable.HashSet $ HashTrieSet.foreach(HashSet.scala:972)scala.collection.TraversableOnce $ class.foldLeft(TraversableOnce.scala:157)scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)scala.collection.TraversableOnce $ class.$ div $冒号(TraversableOnce.scala:151)scala.collection.AbstractTraversable.$ div $冒号(Traversable.scala:104)scala.collection.SetLike $ class.$ plus $ plus(SetLike.scala:141)org.apache.spark.sql.catalyst.expressions.ExpressionSet.$ plus $ plus(ExpressionSet.scala:50)org.apache.spark.sql.catalyst.plans.logical.UnaryNode $$ anonfun $ getAliasedConstraints $ 1.apply(LogicalPlan.scala:323)org.apache.spark.sql.catalyst.plans.logical.UnaryNode $$ anonfun $ getAliasedConstraints $ 1.apply(LogicalPlan.scala:320)scala.collection.immutable.List.foreach(List.scala:392)org.apache.spark.sql.catalyst.plans.logical.UnaryNode.getAliasedConstraints(LogicalPlan.scala:320)org.apache.spark.sql.catalyst.plans.logical.Project.validConstraints(basicLogicalOperators.scala:65)org.apache.spark.sql.catalyst.plans.QueryPlan.constraints $ lzycompute(QueryPlan.scala:188)=>持有Monitor(org.apache.spark.sql.catalyst.plans.logical.Aggregate@1129881457})org.apache.spark.sql.catalyst.plans.QueryPlan.constraints(QueryPlan.scala:188)org.apache.spark.sql.catalyst.plans.logical.Aggregate.validConstraints(basicLogicalOperators.scala:555)org.apache.spark.sql.catalyst.plans.QueryPlan.constraints $ lzycompute(QueryPlan.scala:188)=>持有Monitor(org.apache.spark.sql.catalyst.plans.logical.Aggregate@1129881457})org.apache.spark.sql.catalyst.plans.QueryPlan.constraints(QueryPlan.scala:188)org.apache.spark.sql.catalyst.plans.QueryPlan.getConstraints(QueryPlan.scala:196)org.apache.spark.sql.catalyst.optimizer.PruneFilters $$ anonfun $ apply $ 16 $$ anonfun $ 25.apply(Optimizer.scala:717)org.apache.spark.sql.catalyst.optimizer.PruneFilters $$ anonfun $ apply $ 16 $$ anonfun $ 25.apply(Optimizer.scala:716)scala.collection.TraversableLike $$ anonfun $ partition $ 1.apply(TraversableLike.scala:314)scala.collection.TraversableLike $$ anonfun $ partition $ 1.apply(TraversableLike.scala:314)scala.collection.immutable.List.foreach(List.scala:392)scala.collection.TraversableLike $ class.partition(TraversableLike.scala:314)scala.collection.AbstractTraversable.partition(Traversable.scala:104)org.apache.spark.sql.catalyst.optimizer.PruneFilters $$ anonfun $ apply $ 16.applyOrElse(Optimizer.scala:716)org.apache.spark.sql.catalyst.optimizer.PruneFilters $$ anonfun $ apply $ 16.applyOrElse(Optimizer.scala:705)org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 2.apply(TreeNode.scala:267)org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 2.apply(TreeNode.scala:267)org.apache.spark.sql.catalyst.trees.CurrentOrigin $ .withOrigin(TreeNode.scala:70)org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)org.apache.spark.sql.catalyst.optimizer.PruneFilters.apply(Optimizer.scala:705)org.apache.spark.sql.catalyst.optimizer.PruneFilters.apply(Optimizer.scala:704)org.apache.spark.sql.catalyst.rules.RuleExecutor $$ anonfun $ execute $ 1 $$ anonfun $ apply $ 1.apply(RuleExecutor.scala:85)org.apache.spark.sql.catalyst.rules.RuleExecutor $$ anonfun $ execute $ 1 $$ anonfun $ apply $ 1.apply(RuleExecutor.scala:82)scala.collection.LinearSeqOptimized $ class.foldLeft(LinearSeqOptimized.scala:124)scala.collection.immutable.List.foldLeft(List.scala:84)org.apache.spark.sql.catalyst.rules.RuleExecutor $$ anonfun $ execute $ 1.apply(RuleExecutor.scala:82)org.apache.spark.sql.catalyst.rules.RuleExecutor $$ anonfun $ execute $ 1.apply(RuleExecutor.scala:74)scala.collection.immutable.List.foreach(List.scala:392)org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)org.apache.spark.sql.execution.QueryExecution.optimizedPlan $ lzycompute(QueryExecution.scala:78)=>持有Monitor(org.apache.spark.sql.execution.QueryExecution@1193326176})org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)org.apache.spark.sql.execution.QueryExecution.sparkPlan $ lzycompute(QueryExecution.scala:84)=>持有Monitor(org.apache.spark.sql.execution.QueryExecution@1193326176})org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)org.apache.spark.sql.execution.QueryExecution.executedPlan $ lzycompute(QueryExecution.scala:89)=>持有Monitor(org.apache.spark.sql.execution.QueryExecution@1193326176})org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)org.apache.spark.sql.Dataset.withAction(Dataset.scala:2837)org.apache.spark.sql.Dataset.collect(Dataset.scala:2387) 

解决方案

修复了该问题.因此,问题在于有关此属性 spark.sql.constraintPropagation.enabled .在Spark 2.2.1中,默认值为 true .stacktrace指示其停留在某些查询计划生成中.我在此博客

简短答案:将所述属性设置为false. spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key,false)

I just upgraded spark 2.1.0 to spark 2.2.1. Has anyone seen extreme slow behavior on dataframe.filter(…).collect()?.. specifically a collect operation with filter before. dataframe.collect seems to run okay. However, dataframe.filter(…).collect() takes forever. it contains only 2 records. and its on a unit test. When I go back to spark 2.1.0, its back to normal speed

I have looked at the thread dump and could not find an obvious cause. I have made an effort to make sure all the libraries I am using are also using Spark 2.2.1. Any suggestion would be greatly appreciated.

It seems to be stuck at this stacktrace

scala.collection.mutable.FlatHashTable$class.addEntry(FlatHashTable.scala:151)
scala.collection.mutable.HashSet.addEntry(HashSet.scala:40)
scala.collection.mutable.FlatHashTable$class.addElem(FlatHashTable.scala:142)
scala.collection.mutable.HashSet.addElem(HashSet.scala:40)
scala.collection.mutable.HashSet.$plus$eq(HashSet.scala:59)
scala.collection.mutable.HashSet.$plus$eq(HashSet.scala:40)
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
scala.collection.mutable.AbstractSet.$plus$plus$eq(Set.scala:46)
scala.collection.mutable.HashSet.clone(HashSet.scala:83)
scala.collection.mutable.HashSet.clone(HashSet.scala:40)
org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus(ExpressionSet.scala:65)
org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus(ExpressionSet.scala:50)
scala.collection.SetLike$$anonfun$$plus$plus$1.apply(SetLike.scala:141)
scala.collection.SetLike$$anonfun$$plus$plus$1.apply(SetLike.scala:141)
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:316)
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
scala.collection.TraversableOnce$class.$div$colon(TraversableOnce.scala:151)
scala.collection.AbstractTraversable.$div$colon(Traversable.scala:104)
scala.collection.SetLike$class.$plus$plus(SetLike.scala:141)
org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus$plus(ExpressionSet.scala:50)
org.apache.spark.sql.catalyst.plans.logical.UnaryNode$$anonfun$getAliasedConstraints$1.apply(LogicalPlan.scala:323)
org.apache.spark.sql.catalyst.plans.logical.UnaryNode$$anonfun$getAliasedConstraints$1.apply(LogicalPlan.scala:320)
scala.collection.immutable.List.foreach(List.scala:392)
org.apache.spark.sql.catalyst.plans.logical.UnaryNode.getAliasedConstraints(LogicalPlan.scala:320)
org.apache.spark.sql.catalyst.plans.logical.Project.validConstraints(basicLogicalOperators.scala:65)
org.apache.spark.sql.catalyst.plans.QueryPlan.constraints$lzycompute(QueryPlan.scala:188) => holding Monitor(org.apache.spark.sql.catalyst.plans.logical.Aggregate@1129881457})
org.apache.spark.sql.catalyst.plans.QueryPlan.constraints(QueryPlan.scala:188)
org.apache.spark.sql.catalyst.plans.logical.Aggregate.validConstraints(basicLogicalOperators.scala:555)
org.apache.spark.sql.catalyst.plans.QueryPlan.constraints$lzycompute(QueryPlan.scala:188) => holding Monitor(org.apache.spark.sql.catalyst.plans.logical.Aggregate@1129881457})
org.apache.spark.sql.catalyst.plans.QueryPlan.constraints(QueryPlan.scala:188)
org.apache.spark.sql.catalyst.plans.QueryPlan.getConstraints(QueryPlan.scala:196)
org.apache.spark.sql.catalyst.optimizer.PruneFilters$$anonfun$apply$16$$anonfun$25.apply(Optimizer.scala:717)
org.apache.spark.sql.catalyst.optimizer.PruneFilters$$anonfun$apply$16$$anonfun$25.apply(Optimizer.scala:716)
scala.collection.TraversableLike$$anonfun$partition$1.apply(TraversableLike.scala:314)
scala.collection.TraversableLike$$anonfun$partition$1.apply(TraversableLike.scala:314)
scala.collection.immutable.List.foreach(List.scala:392)
scala.collection.TraversableLike$class.partition(TraversableLike.scala:314)
scala.collection.AbstractTraversable.partition(Traversable.scala:104)
org.apache.spark.sql.catalyst.optimizer.PruneFilters$$anonfun$apply$16.applyOrElse(Optimizer.scala:716)
org.apache.spark.sql.catalyst.optimizer.PruneFilters$$anonfun$apply$16.applyOrElse(Optimizer.scala:705)
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
org.apache.spark.sql.catalyst.optimizer.PruneFilters.apply(Optimizer.scala:705)
org.apache.spark.sql.catalyst.optimizer.PruneFilters.apply(Optimizer.scala:704)
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
scala.collection.immutable.List.foldLeft(List.scala:84)
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
scala.collection.immutable.List.foreach(List.scala:392)
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78) => holding Monitor(org.apache.spark.sql.execution.QueryExecution@1193326176})
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84) => holding Monitor(org.apache.spark.sql.execution.QueryExecution@1193326176})
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89) => holding Monitor(org.apache.spark.sql.execution.QueryExecution@1193326176})
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
org.apache.spark.sql.Dataset.withAction(Dataset.scala:2837)
org.apache.spark.sql.Dataset.collect(Dataset.scala:2387)

解决方案

Fixed it. So the problem was regarding this property spark.sql.constraintPropagation.enabled. The default value is true in Spark 2.2.1. The stacktrace indicates that its stuck in some query plan generation. I found my answer in this blog

Short answer: Set the said property to false. spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false)

这篇关于Spark:从2.1.0升级到2.2.1时,数据框动作确实很慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆