范围分区程序在Spark中如何工作? [英] How does range partitioner work in Spark?

查看:100
本文介绍了范围分区程序在Spark中如何工作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我不清楚范围分区程序在Spark中的工作方式.它使用(储层采样)进行采样.而且,我对计算输入边界的方式感到困惑.

I'm not so clear about how range partitioner works in Spark. It uses (Reservoir Sampling) to take samples. And I was confused by the way of computing the boundaries of the input.

 // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
  val sampleSize = math.min(20.0 * partitions, 1e6)
  // Assume the input partitions are roughly balanced and over-sample a little bit.
  val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt

为什么计算出的sampleSize应该乘以3.0?以及如何获得边界?有人可以告诉我一些例子吗?谢谢!

Why the calculated sampleSize should multiply by 3.0? And how to get the boundary? Can someone show me some examples about this? Thank you!

推荐答案

范围分区的背景

您要发布的代码来自用于获取未分区的RDD并由新的范围分区程序对其进行分区的方法.这涉及三个步骤:

Background on Range Partitioning

The code you're posting comes from the method used to take an unpartitioned RDD and partition it by a new range partitioner. This involves three steps:

  1. 计算合理的范围边界
  2. 从这些范围边界构造一个分区程序,从而为您提供从键K到分区索引的功能
  3. 针对此新分区将RDD随机播放
  1. Compute reasonable range boundaries
  2. Construct a partitioner from these range boundaries which gives you a function from key K to partition index
  3. Shuffle the RDD against this new partitioner

您的问题与这些步骤的第一步有关.理想情况下,您可以只收集所有RDD数据,对其进行排序,然后确定将我们的排序后的集合划分为nPartitions块的范围边界.简单!

Your question concerns the first of these steps. Ideally you could just collect all the RDD data, sort it, and determine range bounds that divide our sorted collection into nPartitions chunks. Easy!

没那么多.该算法在计算中为O(n log n),并且需要与集合成比例的内存.这些事实(尤其是第二个事实)使得在分布式Spark框架中执行不切实际.但是我们不需要我们的分区精确地平衡,就像在我糟糕的收集和排序实现之后那样.只要我们的分区最终达到合理的平衡,就可以了.如果我们可以使用能给我们近似分位数边界但运行速度更快的算法,那可能是一个胜利.

Not so much. This algorithmic is O(n log n) in compute, and requires memory proportional to the collection. These facts (the second in particular) makes it impractical to execute in the distributed Spark framework. But we don't need our partitions to be exactly balanced as they will be after my terrible collect-and-sort implementation. As long as our partitions end up reasonably balanced, we're in the clear. If we can use an algorithm that gives us approximate quantile boundaries but is faster to run, this is probably a win.

好的,所以我们有动力要有一个高效的算法,该算法可以快速运行并且不会占用太多内存.事实证明,储层采样是一种很好的方法.如果您的集合中有1B个元素,并且您采样了1M个元素,则1M个元素的第10个百分点大约等于1B个元素的第10个百分点.您可以使用完全相同的收集和排序算法来确定范围边界,但是要对全部数据的随机抽样子集进行简化.

Okay, so we've got the motivation to have an efficient algorithm that runs quickly and doesn't take too much memory. Reservoir sampling turns out to be a great way to do this. If your collection has 1B elements and you sample 1M, the 10th percentile of your 1M elements is approximately equal to the 10th percentile of your 1B. You can do exactly the same collect-and-sort algorithm to determine range bounds, but on a reduced randomly-sampled subset of the full data.

第一行(sampleSize)估计充分代表真实值范围所需的样本数量.这在某种程度上是任意的,并且可能基于反复试验.但是,由于要并行采样,因此需要知道从每个分布式partition中获取多少个值,而不是总体上获取多少个值.第二行(sampleSizePerPartition)估计此数字.

The first line (sampleSize) estimates the number of samples required to represent the true range of values adequately. This is somewhat arbitrary, and probably based on trial-and-error. But since you want to sample in parallel, you need to know how many values to take from each distributed partition, not how many values to take overall. The second line (sampleSizePerPartition) estimates this number.

我之前提到过我们希望分区如何大致平衡.这是因为大量的Spark函数依赖于此属性-包括sampleSizePerPartition代码.我们知道分区大小略有不同,但是假设它们之间的差异不大.通过从每个分区中采样3倍多的值(如果它们达到完美平衡)将比我们需要的多,我们可以容忍更多的分区不平衡.

Earlier I mentioned how we want partitions to be approximately balanced. This is because a huge number of Spark functions rely on this property -- the sampleSizePerPartition code included. We know that partition sizes vary a bit, but assume that they don't vary too much. By sampling 3x more values from each partition than we would need if they were perfectly balanced, we can tolerate more partition imbalance.

请考虑如果您有100,000个分区会发生什么.在这种情况下,sampleSize是200万(20个分区)

Consider what would happen if you have 100,000 partitions. In this case, sampleSize is 2 million (20 * partitions)

如果从每个分区中抽取20个随机元素,则如果任何分区中的元素少于20个,则最终得到的样本数将少于sampleSize.从每个分区中抽取60个元素是很进取的,但是可以确保在最极端的不平衡分区情况下,您都能获得足够的样本.

If you take 20 random elements from each partition, then if any partition has fewer than 20 elements you're going to end up with fewer samples than sampleSize. Taking 60 elements from each partition is aggressive, but ensures that you'll get enough samples in all but the most extreme imbalanced-partition scenarios.

这篇关于范围分区程序在Spark中如何工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆