DAG是如何在RDD中进行工作的? [英] How DAG works under the covers in RDD?

查看:127
本文介绍了DAG是如何在RDD中进行工作的?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Spark研究论文规定了基于经典Hadoop的新分布式编程模型MapReduce声称在许多情况下,特别是在机器学习方面,其简化和巨大的性能提升.但是,本文似乎缺少用Directed Acyclic GraphResilient Distributed Datasets上显示internal mechanics的材料.

The Spark research paper has prescribed a new distributed programming model over classic Hadoop MapReduce, claiming the simplification and vast performance boost in many cases specially on Machine Learning. However, the material to uncover the internal mechanics on Resilient Distributed Datasets with Directed Acyclic Graph seems lacking in this paper.

通过研究源代码更好地学习吗?

Should it be better learned by investigating the source code?

推荐答案

即使我一直在网上寻找有关spark如何从RDD中计算DAG并随后执行任务的信息.

Even i have been looking in the web to learn about how spark computes the DAG from the RDD and subsequently executes the task.

在较高级别上,当在RDD上调用任何操作时,Spark创建DAG并将其提交给DAG调度程序.

At high level, when any action is called on the RDD, Spark creates the DAG and submits it to the DAG scheduler.

  • DAG调度程序将运算符划分为多个任务阶段.一个阶段由基于输入数据分区的任务组成. DAG调度程序将操作员流水线在一起.例如可以在一个阶段中调度许多地图运算符. DAG调度程序的最终结果是一组阶段.

  • The DAG scheduler divides operators into stages of tasks. A stage is comprised of tasks based on partitions of the input data. The DAG scheduler pipelines operators together. For e.g. Many map operators can be scheduled in a single stage. The final result of a DAG scheduler is a set of stages.

阶段将传递到任务计划程序.任务计划程序通过群集管理器(Spark Standalone/Yarn/Mesos)启动任务.任务计划程序不知道阶段的依赖性.

The Stages are passed on to the Task Scheduler.The task scheduler launches tasks via cluster manager (Spark Standalone/Yarn/Mesos). The task scheduler doesn't know about dependencies of the stages.

Worker在从站上执行任务.

The Worker executes the tasks on the Slave.

让我们来看看Spark如何构建DAG.

Let's come to how Spark builds the DAG.

总体上,可以将两种转换应用于RDD,即窄转换和宽转换.宽泛的转换基本上会导致舞台边界.

At high level, there are two transformations that can be applied onto the RDDs, namely narrow transformation and wide transformation. Wide transformations basically result in stage boundaries.

窄转换-不需要在分区之间对数据进行混洗.例如地图,过滤器等.

Narrow transformation - doesn't require the data to be shuffled across the partitions. for example, Map, filter etc..

广泛的转换-需要对数据进行改组,例如reduceByKey等.

wide transformation - requires the data to be shuffled for example, reduceByKey etc..

让我们举一个例子来计算在每个严重级别上出现多少条日志消息.

Let's take an example of counting how many log messages appear at each level of severity,

以下是从严重性级别开始的日志文件,

Following is the log file that starts with the severity level,

INFO I'm Info message
WARN I'm a Warn message
INFO I'm another Info message

并创建以下scala代码以提取相同的代码,

and create the following scala code to extract the same,

val input = sc.textFile("log.txt")
val splitedLines = input.map(line => line.split(" "))
                        .map(words => (words(0), 1))
                        .reduceByKey{(a,b) => a + b}

此命令序列隐式定义了RDD对象的DAG(RDD谱系),稍后将在调用操作时使用.每个RDD都维护一个指向一个或多个父级的指针,以及有关其与父级之间的关系类型的元数据.例如,当我们在RDD上调用val b = a.map()时,RDD b会保留对其父级a的引用,即沿袭.

This sequence of commands implicitly defines a DAG of RDD objects (RDD lineage) that will be used later when an action is called. Each RDD maintains a pointer to one or more parents along with the metadata about what type of relationship it has with the parent. For example, when we call val b = a.map() on a RDD, the RDD b keeps a reference to its parent a, that's a lineage.

要显示RDD的沿袭,Spark提供了一种调试方法toDebugString().例如,在splitedLines RDD上执行toDebugString(),将输出以下内容:

To display the lineage of an RDD, Spark provides a debug method toDebugString(). For example executing toDebugString() on the splitedLines RDD, will output the following:

(2) ShuffledRDD[6] at reduceByKey at <console>:25 []
    +-(2) MapPartitionsRDD[5] at map at <console>:24 []
    |  MapPartitionsRDD[4] at map at <console>:23 []
    |  log.txt MapPartitionsRDD[1] at textFile at <console>:21 []
    |  log.txt HadoopRDD[0] at textFile at <console>:21 []

第一行(从底部开始)显示输入RDD.我们通过调用sc.textFile()创建了该RDD.下面是根据给定的RDD创建的DAG图的更多图解视图.

The first line (from the bottom) shows the input RDD. We created this RDD by calling sc.textFile(). Below is the more diagrammatic view of the DAG graph created from the given RDD.

一旦构建了DAG,Spark调度程序就会创建一个物理执行计划.如上所述,DAG调度程序将图形分为多个阶段,这些阶段是根据转换创建的.狭窄的转换将组合在一起(流水线化)到一个阶段.因此,对于我们的示例,Spark将创建两个阶段的执行,如下所示:

Once the DAG is build, the Spark scheduler creates a physical execution plan. As mentioned above, the DAG scheduler splits the graph into multiple stages, the stages are created based on the transformations. The narrow transformations will be grouped (pipe-lined) together into a single stage. So for our example, Spark will create two stage execution as follows:

然后,DAG计划程序将阶段提交到任务计划程序中.提交的任务数取决于textFile中存在的分区数. Fox示例考虑此示例中有4个分区,如果有足够的从属/内核,将并行创建和提交4组任务.下图更详细地说明了这一点:

The DAG scheduler will then submit the stages into the task scheduler. The number of tasks submitted depends on the number of partitions present in the textFile. Fox example consider we have 4 partitions in this example, then there will be 4 set of tasks created and submitted in parallel provided there are enough slaves/cores. Below diagram illustrates this in more detail:

有关更多详细信息,我建议您浏览以下youtube视频,其中Spark创作者提供有关DAG,执行计划和生命周期的详细信息.

For more detailed information i suggest you to go through the following youtube videos where the Spark creators give in depth details about the DAG and execution plan and lifetime.

  1. 高级Apache Spark- Sameer Farooqui(Databricks)
  2. 更深入地了解Spark内部-Aaron Davidson(Databricks)
  3. AmpLab Spark内部构件简介
  1. Advanced Apache Spark- Sameer Farooqui (Databricks)
  2. A Deeper Understanding of Spark Internals - Aaron Davidson (Databricks)
  3. Introduction to AmpLab Spark Internals

这篇关于DAG是如何在RDD中进行工作的?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆