如何在Spark中将阶段划分为任务? [英] How are stages split into tasks in Spark?

查看:282
本文介绍了如何在Spark中将阶段划分为任务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面我们假设在每个时间点仅运行一个Spark作业.

Let's assume for the following that only one Spark job is running at every point in time.

这是我了解Spark发生的事情:

Here is what I understand what happens in Spark:

  1. 创建SparkContext后,每个工作程序节点都会启动一个执行程序. 执行程序是单独的进程(JVM),它们重新连接到驱动程序.每个执行程序都有驱动程序的jar.退出驱动程序,关闭执行程序.每个执行程序可以容纳一些分区.
  2. 执行作业时,将根据沿袭图表创建执行计划.
  3. 执行作业分为多个阶段,其中的阶段包含(在沿袭图中)尽可能多的相邻转换和动作,但没有混洗.这样,各个阶段就被随机播放分开了.
  1. When a SparkContext is created, each worker node starts an executor. Executors are separate processes (JVM), that connects back to the driver program. Each executor has the jar of the driver program. Quitting a driver, shuts down the executors. Each executor can hold some partitions.
  2. When a job is executed, an execution plan is created according to the lineage graph.
  3. The execution job is split into stages, where stages containing as many neighbouring (in the lineage graph) transformations and action, but no shuffles. Thus stages are separated by shuffles.

我了解

  • 任务是通过序列化Function对象从驱动程序发送给执行程序的命令.
  • 执行程序使用驱动程序jar反序列化命令(任务)并在分区上执行.
  • A task is a command sent from the driver to an executor by serializing the Function object.
  • The executor deserializes (with the driver jar) the command (task) and executes it on a partition.

但是

我如何将阶段划分为这些任务?

特别是:

  1. 任务是由转换和动作确定的还是一个任务中可以有多个转换/动作?
  2. 是由分区确定的任务吗(例如,每个分区每个阶段每个任务一个任务).
  3. 任务是否由节点确定(例如,每个节点每个阶段一个任务)?

我的想法(即使是正确的,也只能部分回答)

https://0x0fff.com/spark-architecture-shuffle 中,用图片解释

What I think (only partial answer, even if right)

In https://0x0fff.com/spark-architecture-shuffle, the shuffle is explained with the image

我觉得规则是

每个阶段分为#number-of-partitions个任务,不考虑节点数

each stage is split into #number-of-partitions tasks, with no regard for the number of nodes

对于我的第一张图片,我会说我要执行3个贴图任务和3个缩小任务.

For my first image I'd say that I'd have 3 map tasks and 3 reduce tasks.

对于来自0x0fff的图像,我想说有8个地图任务和3个缩小任务(假设只有三个橙色和三个深绿色文件).

For the image from 0x0fff, I'd say there are 8 map tasks and 3 reduce tasks (assuming that there are only three orange and three dark green files).

那是正确的吗?但是,即使这是正确的,我的上述问题也没有得到全部回答,因为它仍然是开放的,是将多个操作(例如,多个地图)放在一个任务中还是将每个操作分成一个任务.

Is that correct? But even if that is correct, my questions above are not all answered, because it is still open, whether multiple operations (e.g. multiple maps) are within one task or are separated into one tasks per operation.

Spark中的任务是什么? Spark工作者如何执行jar文件?

What is a task in Spark? How does the Spark worker execute the jar file? and How does the Apache Spark scheduler split files into tasks? are similar, but I did not feel that my question was answered clearly there.

推荐答案

您在这里有一个非常漂亮的轮廓.回答您的问题

You have a pretty nice outline here. To answer your questions

  • 需要为每个stage的每个数据分区启动一个单独的task .考虑每个分区可能会驻留在不同的物理位置-例如HDFS中的块或本地文件系统的目录/卷.
  • A separate task does need to be launched for each partition of data for each stage. Consider that each partition will likely reside on distinct physical locations - e.g. blocks in HDFS or directories/volumes for a local file system.

请注意,Stage的提交由DAG Scheduler驱动.这意味着可以将不相互依赖的阶段提交给集群以并行执行:这可以最大化集群上的并行化能力.因此,如果我们的数据流中的操作可以同时发生,那么我们将看到启动了多个阶段.

Note that the submission of Stages is driven by the DAG Scheduler. This means that stages that are not interdependent may be submitted to the cluster for execution in parallel: this maximizes the parallelization capability on the cluster. So if operations in our dataflow can happen simultaneously we will expect to see multiple stages launched.

我们可以在下面的玩具示例中看到这一点,在该示例中,我们执行以下类型的操作:

We can see that in action in the following toy example in which we do the following types of operations:

  • 加载两个数据源
  • 分别在两个数据源上执行一些映射操作
  • 加入他们
  • 对结果执行一些映射和过滤操作
  • 保存结果

那么,我们最终将经历多少个阶段?

So then how many stages will we end up with?

  • 每个阶段需要1个阶段,以并行方式加载两个数据源= 2个阶段
  • 第三阶段代表join,该阶段与其他两个阶段相关
  • 注意:对联接的数据进行的所有后续操作都可能在 same 阶段执行,因为它们必须顺序发生.启动其他阶段没有任何好处,因为它们无法完成之前的操作才能开始工作.
  • 1 stage each for loading the two datasources in parallel = 2 stages
  • A third stage representing the join that is dependent on the other two stages
  • Note: all of the follow-on operations working on the joined data may be performed in the same stage because they must happen sequentially. There is no benefit to launching additional stages because they can not start work until the prior operation were completed.

这是玩具程序

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

这是结果的DAG

现在:多少个任务 ?任务数应等于

Now: how many tasks ? The number of tasks should be equal to

(Stage * #Partitions in the stage)的总和

这篇关于如何在Spark中将阶段划分为任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆