DataFrame orderBy,后跟Spark中的限制 [英] DataFrame orderBy followed by limit in Spark

查看:164
本文介绍了DataFrame orderBy,后跟Spark中的限制的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在让一个程序生成一个DataFrame,它将在其上运行类似的东西

I am having a program take generate a DataFrame on which it will run something like

    Select Col1, Col2...
    orderBy(ColX) limit(N)

但是,当我最后收集数据时,我发现如果我取足够大的top N会导致驱动程序OOM.

However, when i collect the data in end, i find that it is causing the driver to OOM if I take a enough large top N

另一个观察结果是,如果我只是进行排序和排序,则不会发生此问题.因此,仅当同时存在sort和top时,才会发生这种情况.

Also another observation is that if I just do sort and top, this problem will not happen. So this happen only when there is sort and top at the same time.

我想知道为什么会这样吗?特别是,这两个转换组合的真正含义是什么?spark将如何评估查询的排序和限制,以及下面相应的执行计划?

I am wondering why it could be happening? And particular, what is really going underneath this two combination of transforms? How does spark will evaluate query with both sorting and limit and what is corresponding execution plan underneath?

也很好奇,DataFrame和RDD之间的火花处理排序和顶部是否不同?

Also just curious does spark handle sort and top different between DataFrame and RDD?

编辑,对不起,我不是故意要收集的,我的原始意思是,当我调用任何操作来实现数据时,无论是否收集数据(或是否有任何将数据发送回驱动程序的操作)(因此,问题绝对不在于输出大小)

EDIT, Sorry i didn't mean collect, what i original just mean that when i call any action to materialize the data, regardless of whether it is collect (or any action sending data back to driver) or not (So the problem is definitely not on the output size)

推荐答案

虽然尚不清楚为什么在这种特殊情况下失败,但您可能会遇到多个问题:

While it is not clear why this fails in this particular case there multiple issues you may encounter:

  • 使用 limit 时,无论 n 有多大,它都将所有数据都放在一个分区中.因此,尽管它没有明确地收集它,但几乎没有坏的.
  • 最重要的是 orderBy 要求对范围分区进行全面的改组,这可能会在数据分布偏斜时导致不同的问题.
  • 最后,当您收集时,结果可能会大于驱动程序上可用的内存量.
  • When you use limit it simply puts all data on a single partition, no matter how big n is. So while it doesn't explicitly collect it almost as bad.
  • On top of that orderBy requires a full shuffle with range partitioning which can result in a different issues when data distribution is skewed.
  • Finally when you collect results can be larger than the amount of memory available on the driver.

如果您收集仍然没有什么可以改进的.最终,驱动程序内存将是一个限制因素,但仍有一些可能的改进:

If you collect anyway there is not much you can improve here. At the end of the day driver memory will be a limiting factor but there still some possible improvements:

  • 首先不要使用 limit .
  • toLocalIterator 替换 collect .
  • 使用 orderBy |> rdd |> zipWithIndex |> filter 或如果值的确切数量为不是直接基于近似分布的 filter 数据的严格要求,如将spark数据框保存为多个部分而无需重新分区(在Spark 2.0.0+中,有一个方便的 approxQuantile 方法).
  • First of all don't use limit.
  • Replace collect with toLocalIterator.
  • use either orderBy |> rdd |> zipWithIndex |> filter or if exact number of values is not a hard requirement filter data directly based on approximated distribution as shown in Saving a spark dataframe in multiple parts without repartitioning (in Spark 2.0.0+ there is handy approxQuantile method).

这篇关于DataFrame orderBy,后跟Spark中的限制的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆