pyspark是否会更改优化指令的顺序? [英] Does pyspark changes order of instructions for optimization?

查看:70
本文介绍了pyspark是否会更改优化指令的顺序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有以下管道:

df.orderBy('foo').limit(10).show()

在这里我们看到orderBy指令排在最前面,因此在执行limit指令之前,应该对数据帧的所有行进行排序.我发现自己在考虑Spark是否在管道内部进行一些重组"以提高性能(例如,在orderBy之前先执行limit指令 ).火花能做到吗?

Here we can see that the orderBy instruction comes first, so all rows of the dataframe should be sorted before the limit instruction be executed. I found myself thinking if the Spark does some "reorganization" inside the pipeline in order to improve performace (for example, executing the limit instruction before the orderBy). Does spark do that?

推荐答案

您的假设是正确的.在合并/收集结果之前,Spark在每个分区上先执行sort,然后执行limit,如下所示.

Your assumption is correct. Spark executes sort and then limit on each partition before merging/collecting the results as we will see next.

后跟limitorderBy将引起下一次呼叫:

An orderBy followed by limit will cause the next calls:

  • [Dataset.scala] Dataset:orderBy()
  • [Dataset.scala] Dataset:sortInternal()
  • [SparkStrategies.scala] SpecialLimits:apply()
  • [limit.scala] TakeOrderedAndProjectExec:doExecute()

通过查看TakeOrderedAndProjectExec:doExecute()方法,我们将首先遇到下一个代码:

By looking into the TakeOrderedAndProjectExec:doExecute() method we will first meet the next code:

protected override def doExecute(): RDD[InternalRow] = {
    val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
    val localTopK: RDD[InternalRow] = {
      child.execute().map(_.copy()).mapPartitions { iter =>
        org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord)
      }
    }

......

在这里我们可以看到localTopK是通过从每个已排序分区中获取 topK首条记录来填充的.这意味着Spark尝试在分区级别上尽快下推topK过滤器.

Here we can see that the localTopK is populated by getting topK first records from each sorted partition. That means that Spark tries to push-down the topK filter as soon as possible at partition level.

下一行:

....

val shuffled = new ShuffledRowRDD(
      ShuffleExchangeExec.prepareShuffleDependency(
        localTopK,
        child.output,
        SinglePartition,
        serializer,
        writeMetrics),
      readMetrics)
    shuffled.mapPartitions { iter =>
      val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
      if (projectList != child.output) {
        val proj = UnsafeProjection.create(projectList, child.output)
        topK.map(r => proj(r))
      } else {
        topK
      }
    }

将从所有分区中生成最终的ShuffledRowRDD,这些分区将包含组成top limit的最终结果的topK排序的最终记录.

Will generate the final ShuffledRowRDD from all partitions which will contain the final topK sorted records composing the final result of limit.

示例

让我们通过一个例子来说明这一点.考虑范围为1,2,3...20的数据集,该数据集分为两部分.第一个包含奇数,第二个包含偶数,如下所示:

Let's illustrate this through an example. Consider the dataset with the range 1,2,3...20 which is partitioned into two parts. The first one contains the odd numbers when the second one the even numbers as shown next:

-----------   -----------
|   P1    |   |   P2    | 
-----------   -----------
|   1     |   |   2     |
|   3     |   |   4     |
|   5     |   |   6     |
|   7     |   |   8     |
|   9     |   |   10    |
|  ....   |   |  ....   |
|   19    |   |   20    |
-----------   -----------

当执行df.orderBy(...).limit(5)时,Spark将从每个分区(前者为1-9,第二个为2-10)中获得前5个排序记录.然后它将合并并按序列1,2,3,4,5..10对其进行排序.最后,它将获得前5条记录,并生成最终列表1,2,3,4,5.

when df.orderBy(...).limit(5) is executed Spark will get top 5 sorted records from each partition aka 1-9 for the 1st one and 2-10 for the 2nd one. Then it will merge and sort them aka sequence 1,2,3,4,5..10. Finally it will get the top 5 records generating the final list 1,2,3,4,5.

结论

当涉及到orderBy之后是limit时,Spark会利用所有可用信息,从而省略了对整个数据集的处理,只处理了前topK行.正如@ShemTov已经提到的那样,因为第一个将返回无效的数据集,第二个因为第一个将返回无效数据集的调用而无需在orderBy之前调用limit,因为Spark在内部为您完成了所有必要的优化.

Spark leverages all the available information when it comes to orderBy followed by limit by omitting to process the whole dataset but only the first topK rows. As @ShemTov already mentioned there is no need to call limit before orderBy since 1st that would return an invalid dataset and 2nd because Spark does all the necessary optimisations internally for you.

这篇关于pyspark是否会更改优化指令的顺序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆