合并减少整个阶段的并行性(火花) [英] Coalesce reduces parallelism of entire stage (spark)

查看:55
本文介绍了合并减少整个阶段的并行性(火花)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有时,Spark以一种低效的方式优化"数据框架计划.考虑以下Spark 2.1中的示例(也可以在Spark 1.6中复制):

Sometimes Spark "optimizes" a dataframe plan in an inefficient way. Consider the following example in Spark 2.1 (can also be reproduced in Spark 1.6):

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})

val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))

df_result
.coalesce(1)
.saveAsTable(tablename)

在此示例中,我想在对数据帧进行昂贵的转换后写入1个文件(这只是一个演示此问题的示例). Spark将coalesce(1)向上移动,以使UDF仅应用于包含1个分区的数据帧,从而破坏了并行性(有趣的是repartition(1)并非如此).

In this example I want to write 1 file after an expensive transformation of a dataframe (this is just an example to demonstrate the issue). Spark moves the coalesce(1) up such that the UDF is only applied to a dataframe containing 1 partition, thus destroying parallelism (interestingly repartition(1) does not behave this way).

概括地说,当我想在转换的某个部分中增加并行度,但此后降低并行度时,就会发生这种现象.

To generalize, this behavior occurs when I want to increase parallelism in a certain part of my transformation, but decrease parallelism thereafter.

我找到了一种解决方法,包括缓存数据框,然后触发对数据框的完整评估:

I've found one workaround which consists of caching the dataframe and then triggering the complete evaluation of the dataframe:

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})

val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
.cache

df_result.rdd.count // trigger computation

df_result
.coalesce(1)
.saveAsTable(tablename)

我的问题是:在这种情况下,还有另一种方法可以告诉Spark不要降低并行度吗?

My question is: is there another way to tell Spark not to decrease parallelism in such cases?

推荐答案

实际上并非由于SparkSQL的优化,SparkSQL不会更改Coalesce运算符的位置,如执行的计划所示:

Actually it is not because of SparkSQL's optimization, SparkSQL doesn't change the position of Coalesce operator, as the executed plan shows:

Coalesce 1
+- *Project [value#2, UDF(value#2) AS udfResult#11]
   +- *SerializeFromObject [input[0, double, false] AS value#2]
      +- Scan ExternalRDDScan[obj#1]

我引用了合并API描述中的一段:

I quote a paragraph from coalesce API's description:

注:此段落由jira SPARK-19399添加.因此,不应在2.0的API中找到它.

Note: This paragraph is added by the jira SPARK-19399. So it should not be found in 2.0's API.

但是,如果您要进行剧烈的合并,例如到numPartitions = 1,这可能导致您的计算在更少的节点上进行 超出您的期望(例如,在numPartitions = 1的情况下为一个节点).到 避免这种情况,您可以调用重新分区.这将增加一个随机播放步骤, 但意味着当前的上游分区将并行执行 (无论当前分区是什么).

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

合并API不会执行随机播放,但是会导致以前的RDD和当前的RDD之间的依赖关系狭窄.由于RDD是惰性评估,因此实际上是使用合并分区来完成计算的.

The coalesce API doesn't perform a shuffle, but results in a narrow dependency between previous RDD and current RDD. As RDD is lazy evaluation, the computation is actually done with coalesced partitions.

为防止这种情况,您应该使用重新分区API.

To prevent it, you should use repartition API.

这篇关于合并减少整个阶段的并行性(火花)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆