如何使用Spark的repartitionAndSortWithinPartitions? [英] How to use Spark's repartitionAndSortWithinPartitions?

查看:164
本文介绍了如何使用Spark的repartitionAndSortWithinPartitions?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试构建repartitionAndSortWithinPartitions的最小工作示例以了解该功能.我到现在为止了(不能正常工作,不重复的值会乱扔,使它们乱七八糟)

I am trying to build a minimal working example of repartitionAndSortWithinPartitions in order to understand the function. I have got so far (not working, the distinct throws the values around so that they get out of order)

def partval(partID:Int, iter: Iterator[Int]): Iterator[Tuple2[Int, Int]] = {
  iter.map( x => (partID, x)).toList.iterator
}

val part20to3_chaos = sc.parallelize(1 to 20, 3).distinct
val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(2)
part20to2_sorted.mapPartitionsWithIndex(partval).collect

但收到错误

Name: Compile Error
Message: <console>:22: error: value repartitionAndSortWithinPartitions is not a member of org.apache.spark.rdd.RDD[Int]
             val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(2)

我尝试使用 scaladoc ,但是没有能够找到哪个类提供repartitionAndSortWithinPartitions. (顺便说一句,这个scaladoc并不令人印象深刻:为什么缺少MapPartitionsRDD?如何搜索方法?)

I tried using the scaladoc, but wasn't able to find which class provides repartitionAndSortWithinPartitions. (Btw: This scaladoc is not impressive: Why is MapPartitionsRDD missing? How can I search for a method?)

意识到我需要一个分区对象,接下来我尝试

Realising I need a partitioner object, next I tried to

val rangePartitioner = new org.apache.spark.RangePartitioner(2, part20to3_chaos)
val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(rangePartitioner)
part20to2_sorted.mapPartitionsWithIndex(partval).collect

但是得到

Name: Compile Error
Message: <console>:22: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[Int]
 required: org.apache.spark.rdd.RDD[_ <: Product2[?,?]]
Error occurred in an application involving default arguments.
         val rPartitioner = new org.apache.spark.RangePartitioner(2, part20to3_chaos)

如何获取此文件?请给我一个可行的例子吗?

How do I get this to compile? Could I get a working example, please?

推荐答案

您的问题是part20to3_chaosRDD[Int],而

Your problem is that part20to3_chaos is an RDD[Int], while OrderedRDDFunctions.repartitionAndSortWithinPartitions is a method which operates on an RDD[(K, V)], where K is the key and V is the value.

repartitionAndSortWithinPartitions首先将基于提供的分区程序对数据进行重新分区,然后按键进行排序:

repartitionAndSortWithinPartitions will first repartition the data based on the provided partitioner, and then sort by the key:

/**
 * Repartition the RDD according to the given partitioner and, 
 * within each resulting partition, sort records by their keys.
 *
 * This is more efficient than calling `repartition` and then sorting within each partition
 * because it can push the sorting down into the shuffle machinery.
 */
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = 
  self.withScope {
    new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}

因此,看起来好像并不是您要找的东西.

So it looks like it's not exactly what you're looking for.

如果您想要普通的旧排序,则可以使用sortBy,因为它不需要键:

If you want a plain old sort, you can use sortBy, as it doesn't require a key:

scala> val toTwenty = sc.parallelize(1 to 20, 3).distinct
toTwenty: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at distinct at <console>:33

scala> val sorted = toTwenty.sortBy(identity, true, 3).collect
sorted: Array[Int] = 
    Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

在其中传递sortBy顺序(升序或降序)以及要创建的分区数.

Where you pass sortBy the order (ascending or descending), and the number of partitions you want to create.

这篇关于如何使用Spark的repartitionAndSortWithinPartitions?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆