如何知道(广播)连接查询中的 Spark 作业和阶段的数量? [英] How to know the number of Spark jobs and stages in (broadcast) join query?

查看:21
本文介绍了如何知道(广播)连接查询中的 Spark 作业和阶段的数量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 Spark 2.1.2.

我试图了解在作业运行时各种 Spark UI 选项卡显示的对比.我使用 spark-shell --master local 并执行以下 join 查询:

val df = Seq((55, "加拿大", -1, "", 0),(77, "安大略", 55, "/55", 1),(100, "多伦多", 77, "/55/77", 2),(104, "布兰普顿", 100, "/55/77/100", 3)).toDF("id", "name", "parentId", "path", "depth")val dfWithPar = df.as("df1").join(df.as("df2"), $"df1.parentId" === $"df2.Id", "leftouter").select($"df1.*", $"df2.name" 作为 "parentName")dfWithPar.show

这是物理查询计划:

== 物理计划 ==*项目[Id#11, name#12, parentId#13, path#14, depth#15, name#25 AS parentName#63]+- *BroadcastHashJoin [parentId#13], [Id#24], LeftOuter, BuildRight:- LocalTableScan [Id#11, name#12, parentId#13, path#14, depth#15]+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))+- LocalTableScan [Id#24, name#25]

我有两个关于查询执行的问题.

  1. 为什么查询有两个作业?

  2. 为什么两个作业显示的舞台视图相同?下面是job id 1的stage view截图,和job id 0完全一样.为什么?

解决方案

我使用 Spark 2.3.0 来回答您的问题(实际上是 2.3.1-SNAPSHOT),因为它是撰写本文时最新和最伟大的.由于您的 2.1.2 和我的 2.3.0 中的物理查询计划完全相同(除了

tl;dr 一个 Spark 作业用于 BroadcastHashJoinExec 物理运算符,而另外两个用于 Dataset.show.

为了了解结构化查询的查询执行和 Spark 作业的数量,了解结构化查询(使用 Dataset API 描述)和 RDD API 之间的区别很重要.

Spark SQL 的数据集和 Spark Core 的 RDD 都描述了 Spark 中的分布式计算.RDD 是 Spark汇编"语言(类似于 JVM 字节码),而数据集是使用类 SQL 语言(类似于 Scala 或 Java 等 JVM 语言,与我之前使用的 JVM 字节码相比)对结构化查询的高级描述.

重要的是,使用 Dataset API 的结构化查询最终会成为基于 RDD 的分布式计算(这可以与 Java 或 Scala 编译器如何将高级语言转换为 JVM 字节码进行比较).

Dataset API 是对 RDD API 的抽象,当您在 DataFrame 或 Dataset 上调用操作时,该操作会将它们转换为 RDD.

有了这个,你不应该感到惊讶

showtakehead 都导致

BroadcastHashJoin 和 BroadcastExchangeExec 物理运算符

BroadcastHashJoinExec 二元物理操作符用于连接右侧可以广播(也就是 spark.sql.autoBroadcastJoinThreshold10M代码>默认).

BroadcastExchangeExec 一元物理运算符用于将(关系的)行广播到工作节点(以支持 BroadcastHashJoinExec).

BroadcastHashJoinExec 被执行时(生成一个 RDD[InternalRow]),它

如果您没有得到广播连接查询,您就不会拥有 Spark 作业.

RDD.take 操作符

我在回答问题的第一时间错过的是数据集运算符,即 showtakehead,将最终导致RDD.take.

<块引用>

take(num: Int): Array[T] 取 RDD 的前 num 个元素.它的工作原理是首先扫描一个分区,然后使用该分区的结果来估计满足限制所需的额外分区数.

请注意,当 take它的工作原理是首先扫描一个分区,然后使用该分区的结果来估计满足限制所需的附加分区的数量." 这是了解广播连接查询中 Spark 作业数量的关键.

每次迭代(在上面的描述中)都是一个

猜猜如果您执行了dfWithPar.show(1),您将拥有多少个 Spark 作业.

为什么阶段相同?

<块引用>

为什么两个作业显示的舞台视图相同?下面是job id 1的stage view截图,和job id 0完全一样.为什么?

这很容易回答,因为两个 Spark 作业都来自 RDD.take(20).

第一个 Spark 作业是扫描第一个分区,由于它没有足够的行导致另一个 Spark 作业扫描更多分区.

I use Spark 2.1.2.

I am trying to understand various spark UI tab displays vis-a-vis as a job runs. I use spark-shell --master local and doing the following join query:

val df = Seq(
  (55, "Canada", -1, "", 0),
  (77, "Ontario", 55, "/55", 1),
  (100, "Toronto", 77, "/55/77", 2),
  (104, "Brampton", 100, "/55/77/100", 3)
).toDF("id", "name", "parentId", "path", "depth")

val dfWithPar = df.as("df1").
  join(df.as("df2"), $"df1.parentId" === $"df2.Id", "leftouter").
  select($"df1.*", $"df2.name" as "parentName")

dfWithPar.show

This is the physical query plan:

== Physical Plan ==
*Project [Id#11, name#12, parentId#13, path#14, depth#15, name#25 AS parentName#63]
+- *BroadcastHashJoin [parentId#13], [Id#24], LeftOuter, BuildRight
   :- LocalTableScan [Id#11, name#12, parentId#13, path#14, depth#15]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- LocalTableScan [Id#24, name#25]

I've got two questions about the query execution.

  1. Why are there two jobs for the query?

  2. Why are the stage view shown for both jobs identical? Below is a screenshot of the stage view of job id 1 which is exactly the same of job id 0. Why?

解决方案

I use Spark 2.3.0 to answer your question (2.3.1-SNAPSHOT actually) since it is the latest and greatest at the time of this writing. That changes very little about query execution (if anything important) as the physical query plans in your 2.1.2 and my 2.3.0 are exactly the same (except the per-query codegen stage ID in round brackets).


After dfWithPar.show the structured query (that you built using Spark SQL's Dataset API for Scala) is optimized to the following physical query plan (I'm including it in my answer for better comprehension).

scala> dfWithPar.explain
== Physical Plan ==
*(1) Project [Id#11, name#12, parentId#13, path#14, depth#15, name#24 AS parentName#58]
+- *(1) BroadcastHashJoin [parentId#13], [Id#23], LeftOuter, BuildRight
   :- LocalTableScan [Id#11, name#12, parentId#13, path#14, depth#15]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- LocalTableScan [Id#23, name#24]

Number of Spark Jobs

Why are there two jobs for the query?

I'd say there are even three Spark jobs.

tl;dr One Spark job is for BroadcastHashJoinExec physical operator whereas the other two are for Dataset.show.

In order to understand the query execution and the number of Spark jobs of a structured query, it is important to understand the difference between structured queries (described using Dataset API) and RDD API.

Spark SQL's Datasets and Spark Core's RDDs both describe distributed computations in Spark. RDDs are the Spark "assembler" language (akin to the JVM bytecode) while Datasets are higher-level descriptions of structured queries using SQL-like language (akin to JVM languages like Scala or Java as compared to the JVM bytecode I used earlier).

What's important is that structured queries using Dataset API eventually end up as a RDD-based distributed computation (which could be compared to how the Java or Scala compilers transform the higher-level languages to the JVM bytecode).

Dataset API is an abstraction over RDD API and when you call an action on a DataFrame or Dataset that action transforms them to RDDs.

With that, you should not be surprised that Dataset.show will in the end call RDD action that in turn will run zero, one or many Spark jobs.

Dataset.show (with numRows equals 20 by default) in the end calls showString that take(numRows + 1) to get an Array[Row].

val takeResult = newDf.select(castCols: _*).take(numRows + 1)

In other words, dfWithPar.show() is equivalent of dfWithPar.take(21) which in turn is equivalent to dfWithPar.head(21) as far as the number of Spark jobs are concerned.

You can see them and their number of jobs in the SQL tab. They should all be equal.

show or take or head all lead to collectFromPlan that triggers the Spark jobs (by calling executeCollect).

You should be certain that to answer your question about the number of jobs is to know how all the physical operators in the query work. You simply have to know their behaviour at runtime and whether they trigger Spark jobs at all.

BroadcastHashJoin and BroadcastExchangeExec Physical Operators

BroadcastHashJoinExec binary physical operator is used when the right side of a join can be broadcast (which is exactly spark.sql.autoBroadcastJoinThreshold that is 10M by default).

BroadcastExchangeExec unary physical operator is used to broadcast rows (of a relation) to worker nodes (to support BroadcastHashJoinExec).

When BroadcastHashJoinExec is executed (to generate a RDD[InternalRow]), it creates a broadcast variable that in turn executes BroadcastExchangeExec (on a separate thread).

That's why the run at ThreadPoolExecutor.java:1149 Spark job 0 was run.

You could see the single Spark job 0 ran if you executed the following:

// Just a single Spark job for the broadcast variable
val r = dfWithPar.rdd

That requires that the structured query is executed to produce a RDD that is then the target of the action to give the final result.

You would not have had the Spark job if you had not ended up with a broadcast join query.

RDD.take Operator

What I missed the very first moment when I answered the question was that the Dataset operators, i.e. show, take and head, will in the end lead to RDD.take.

take(num: Int): Array[T] Take the first num elements of the RDD. It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.

Please note when take says "It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit." That's the key to understand the number of Spark jobs in your broadcast join query.

Every iteration (in the description above) is a separate Spark job starting with the very first partition and 4 times as many every following iteration:

// RDD.take
def take(num: Int): Array[T] = withScope {
  ...
  while (buf.size < num && partsScanned < totalParts) {
    ...
    val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
    ...
  }
}

Have a look at the following RDD.take with 21 rows.

// The other two Spark jobs
r.take(21)

You will get 2 Spark jobs as in your query.

Guess how many Spark jobs you will have if you executed dfWithPar.show(1).

Why Are Stages Identical?

Why are the stage view shown for both jobs identical? Below is a screenshot of the stage view of job id 1 which is exactly the same of job id 0. Why?

That's easy to answer since both Spark jobs are from RDD.take(20).

The first Spark job is to scan the first partition and since it had not enough rows led to another Spark job to scan more partitions.

这篇关于如何知道(广播)连接查询中的 Spark 作业和阶段的数量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆