如何使用Spark的repartitionAndSortWithinPartitions? [英] How to use Spark's repartitionAndSortWithinPartitions?
问题描述
我正在尝试构建repartitionAndSortWithinPartitions
的最小工作示例以了解该功能.我到现在为止了(不能正常工作,不重复的值会乱扔,使它们乱七八糟)
I am trying to build a minimal working example of repartitionAndSortWithinPartitions
in order to understand the function. I have got so far (not working, the distinct throws the values around so that they get out of order)
def partval(partID:Int, iter: Iterator[Int]): Iterator[Tuple2[Int, Int]] = {
iter.map( x => (partID, x)).toList.iterator
}
val part20to3_chaos = sc.parallelize(1 to 20, 3).distinct
val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(2)
part20to2_sorted.mapPartitionsWithIndex(partval).collect
但收到错误
Name: Compile Error
Message: <console>:22: error: value repartitionAndSortWithinPartitions is not a member of org.apache.spark.rdd.RDD[Int]
val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(2)
我尝试使用 scaladoc ,但是没有能够找到哪个类提供repartitionAndSortWithinPartitions
. (顺便说一句,这个scaladoc并不令人印象深刻:为什么缺少MapPartitionsRDD
?如何搜索方法?)
I tried using the scaladoc, but wasn't able to find which class provides repartitionAndSortWithinPartitions
. (Btw: This scaladoc is not impressive: Why is MapPartitionsRDD
missing? How can I search for a method?)
意识到我需要一个分区对象,接下来我尝试
Realising I need a partitioner object, next I tried to
val rangePartitioner = new org.apache.spark.RangePartitioner(2, part20to3_chaos)
val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(rangePartitioner)
part20to2_sorted.mapPartitionsWithIndex(partval).collect
但是得到
Name: Compile Error
Message: <console>:22: error: type mismatch;
found : org.apache.spark.rdd.RDD[Int]
required: org.apache.spark.rdd.RDD[_ <: Product2[?,?]]
Error occurred in an application involving default arguments.
val rPartitioner = new org.apache.spark.RangePartitioner(2, part20to3_chaos)
如何获取此文件?请给我一个可行的例子吗?
How do I get this to compile? Could I get a working example, please?
推荐答案
您的问题是part20to3_chaos
是RDD[Int]
,而
Your problem is that part20to3_chaos
is an RDD[Int]
, while OrderedRDDFunctions.repartitionAndSortWithinPartitions
is a method which operates on an RDD[(K, V)]
, where K
is the key and V
is the value.
repartitionAndSortWithinPartitions
首先将基于提供的分区程序对数据进行重新分区,然后按键进行排序:
repartitionAndSortWithinPartitions
will first repartition the data based on the provided partitioner, and then sort by the key:
/**
* Repartition the RDD according to the given partitioner and,
* within each resulting partition, sort records by their keys.
*
* This is more efficient than calling `repartition` and then sorting within each partition
* because it can push the sorting down into the shuffle machinery.
*/
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] =
self.withScope {
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}
因此,看起来好像并不是您要找的东西.
So it looks like it's not exactly what you're looking for.
如果您想要普通的旧排序,则可以使用sortBy
,因为它不需要键:
If you want a plain old sort, you can use sortBy
, as it doesn't require a key:
scala> val toTwenty = sc.parallelize(1 to 20, 3).distinct
toTwenty: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at distinct at <console>:33
scala> val sorted = toTwenty.sortBy(identity, true, 3).collect
sorted: Array[Int] =
Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
在其中传递sortBy
顺序(升序或降序)以及要创建的分区数.
Where you pass sortBy
the order (ascending or descending), and the number of partitions you want to create.
这篇关于如何使用Spark的repartitionAndSortWithinPartitions?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!