Spark会使用此sortByKey/map/collect序列保留键顺序吗? [英] Would Spark preserve key order with this sortByKey/map/collect sequence?

查看:370
本文介绍了Spark会使用此sortByKey/map/collect序列保留键顺序吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

让我们说,我们有这个.

Let us say, we have this.

val sx = sc.parallelize(Array((0, 39), (4, 47), (3, 51), (1, 98), (2, 61)))

然后我们称其为

val sy = sx.sortByKey(true)

哪个会做

sy = RDD[(0, 39), (1, 98), (2, 61), (3, 51), (4, 47)] 

然后我们做

collected = sy.map(x => (x._2 / 10, x._2)).collect

我们总是会得到以下信息.我的意思是,即使更改了键值,仍会保留原始键顺序吗?

Would we always get the following. I mean, would the original key order be preserved, despite changing the key values?

collected = [(3, 39), (9, 98), (6, 61), (5, 51), (4, 47)]

推荐答案

应用map()转换并调用collect()不会 not 更改collect()返回的数组元素的顺序.为了证明这一点,我们只需要证明:

Applying the map() transformation and calling collect() does not change the ordering of the array elements returned by collect(). To prove this, we simply have to show that:

  • 地图不会修改RDD中的元素顺序
  • collect将始终在每次调用时以相同的数组顺序返回RDD的元素

第一点很容易证明.在后台,对map()的调用只是通过遍历每个分区并在分区中的每个元素上调用传递给map()的函数参数来生成MapPartitionsRDD.因此,这里没有修改顺序,因为每个分区内的元素排序保持不变.

The first point is pretty easy to prove. Under the hood, a call to map() just produces a MapPartitionsRDD by iterating through every partition and calling the function argument passed to map() on every element within a partition. Therefore, no ordering is modified here as the the element ordering within each partition stays the same.

第二点可以通过仔细观察collect()来证明.以下代码是collect()的实现以及收集调用的功能.

The second point can be proven with a closer look at collect(). The following code is the implementation for collect() as well as the function that collect calls.

来自RDD.scala:

def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}

来自SparkContext.scala:

def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
  runJob(rdd, func, 0 until rdd.partitions.length)
}

被调用的runJob()函数(这是一个重载方法)将Seq[Int]传递给另一个runJob()方法,该Seq[Int]包含将分区处理的顺序.最终,该订单将被发送到调度程序,该调度程序将确定操作如何处理分区.因此,对于collect(),我们将始终从第一个开始按顺序处理分区.

The runJob() function being called (which is an overloaded method) passes a Seq[Int] containing the order in which partitions will be processed to another runJob() method. This order is eventually bubbled up to the scheduler, which will determine how the action will process partitions. So with the case of collect(), we will always process partitions in a sequential order starting with the first one.

因此,由于map()collect()均未修改分区顺序或分区中元素的顺序,因此每次收集结果的顺序都相同.但是,如果您应用的转换需要在收集之前进行洗牌,则所有押注都会取消,因为数据将被重新分区.

Therefore, since neither map() nor collect() modifies the partition order or the ordering of elements within a partition, you will see the same ordering for the result of your collect every time. However, if you apply a transformation that requires a shuffle before your collect, all bets are off as the data will be repartitioned.

这篇关于Spark会使用此sortByKey/map/collect序列保留键顺序吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆