在哪些情况下会跳过DAG的阶段? [英] In which situations are the stages of DAG skipped?

查看:109
本文介绍了在哪些情况下会跳过DAG的阶段?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试查找在我使用RDD的情况下Spark会跳过阶段的情况.我知道,如果发生洗牌操作,它将跳过阶段.因此,我编写了以下代码,看是否正确:

I am trying to find the situations in which Spark would skip stages in case I am using RDDs. I know that it will skip stages if there is a shuffle operation happening. So, I wrote the following code to see if it is true:

def main(args: Array[String]): Unit =
{
  val conf = new SparkConf().setMaster("local").setAppName("demo")
  val sc   = new SparkContext(conf)

  val d = sc.parallelize(0 until 1000000).map(i => (i%100000, i))

  val c=d.rightOuterJoin(d.reduceByKey(_+_)).collect
  val f=d.leftOuterJoin(d.reduceByKey(_+_)).collect
  val g=d.join(d.reduceByKey(_ + _)).collect
}

在检查Spark UI时,其阶段将得到以下工作:

On inspecting Spark UI, I am getting the following jobs with its stages:

我期望跳过第3阶段和第6阶段,因为它们使用相同的RDD来计算所需的联接(鉴于在随机播放的情况下,spark自动缓存数据).谁能解释为什么我在这里看不到任何跳过的阶段?以及如何修改代码以查看跳过的阶段?还有什么其他情况(除了改组之外)会导致Spark跳过阶段?

I was expecting stage 3 and stage 6 to be skipped as these were using the same RDD to compute the required joins(given the fact that in case of shuffle, spark automatically caches data). Can anyone please explain why am I not seeing any skipped stages here? And how can I modify the code to see the skipped stages? And are there any other situations(apart from shuffling) when Spark is expected to skip stages?

推荐答案

实际上,这非常简单.

在您的情况下,不能跳过任何操作,因为每个Action具有不同的JOIN类型.它需要扫描d和d'来计算结果.即使使用.cache(您不使用它,也应该使用它来避免重新计算每个Action的源代码),这没有什么区别.

In your case nothing can be skipped as each Action has a different JOIN type. It needs to scan d and d' to compute the result. Even with .cache (which you do not use and should use to avoid recomputing all the way back to source on each Action), this would make no difference.

看这个简化的版本:

val d = sc.parallelize(0 until 100000).map(i => (i%10000, i)).cache // or not cached, does not matter

val c=d.rightOuterJoin(d.reduceByKey(_+_))
val f=d.leftOuterJoin(d.reduceByKey(_+_))

c.count
c.collect // skipped, shuffled 
f.count
f.collect // skipped, shuffled

显示此应用程序的以下作业:

Shows the following Jobs for this App:

(4) Spark Jobs
Job 116 View(Stages: 3/3)
Job 117 View(Stages: 1/1, 2 skipped)
Job 118 View(Stages: 3/3)
Job 119 View(Stages: 1/1, 2 skipped)

您可以看到,基于相同混洗结果的连续动作会导致val c或val f的第二个Action/Job跳过一个或多个阶段.也就是说,c和f的联接类型是已知的,并且相同联接类型的2个动作从先前的工作中顺次运行,即第二个动作可以依靠直接适用于第二个动作的第一个动作的改组行动.这么简单.

You can see that successive Actions based on same shuffling result cause a skipping of one or more Stages for the second Action / Job for val c or val f. That is to say, the join type for c and f are known and the 2 Actions for the same join type run sequentially profiting from prior work, i.e. the second Action can rely on the shuffling of the first Action that is directly applicable to the 2nd Action. That simple.

这篇关于在哪些情况下会跳过DAG的阶段?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆