如何在Spark中合并两个预排序的rdds? [英] How to merge two presorted rdds in spark?
问题描述
我有两个大的csv文件,这些文件按其中一列进行了预排序.有没有一种方法可以利用它们已经被排序的事实,从而更快地获得新的已排序RDD,而无需再次进行完全排序?
简短的答案:不,当使用Apache Spark提供的排序工具时,没有办法利用已经对两个输入RDD进行排序的事实.>
答案很长:在某些情况下,可能有比使用sortBy
或sortByKey
更好的方法.
最明显的情况是输入的RDD已排序并代表不同的范围.在这种情况下,假设rdd1
中的所有元素都排在rdd2
中的所有元素之前(根据选择的顺序),简单地使用rdd1.union(rdd2)
是组合输入RDD的最快方法(几乎为零成本).>
当输入RDD的范围重叠时,事情变得更加棘手.假设目标RDD仅具有一个分区,则在两个RDD上使用toLocalIterator
然后手动进行合并可能会比较有效.如果结果必须是RDD,则可以在自定义RDD类型的compute
方法中执行此操作,处理输入的RDD并生成输出.
当输入很大并因此由许多分区组成时,事情就变得更加棘手.在这种情况下,您可能还希望在输出RDD中包含多个分区.您可以使用前面提到的自定义RDD,但可以创建多个分区(使用RangePartitioner
).每个分区将覆盖一个不同的元素范围(在最佳情况下,这些范围将覆盖输出的大小大致相等的部分).
棘手的部分是避免在compute
中多次处理完整的输入RDD.当输入RDD使用RangePartitioner
时,可以使用OrderedRDDFunctions
中的filterByRange
有效地避免这种情况.当他们不使用RangePartitioner
时,但是您知道分区是内部排序的并且具有全局顺序,则首先需要通过实际探查数据来找出这些分区所覆盖的有效范围.
由于多分区情况相当复杂,因此我将检查定制排序是否真的比仅使用sortBy
或sortByKey
快. sortBy
和sortByKey
的逻辑在改组过程(在节点之间传输数据)方面进行了高度优化.因此,在许多情况下,即使定制逻辑可以是O(n),而sortBy
/sortByKey
可以是O(n log (n)).
如果您想了解有关Apache Spark使用的改组逻辑的更多信息,请访问 解决方案
The short answer: No, there is no way to leverage the fact that two input RDDs are already sorted when using the sort facilities offered by Apache Spark.
The long answer: Under certain conditions, there might be a better way than using sortBy
or sortByKey
.
The most obvious case is when the input RDDs are already sorted and represent distinct ranges. In this case, simply using rdd1.union(rdd2)
is the fastest (virtually zero cost) way for combining the input RDDs, assuming that all elements in rdd1
come before all elements in rdd2
(according to the chosen ordering).
When the ranges of the input RDDs overlap, things get more tricky. Assuming that the target RDD shall only have a single partition, it might be efficient to use toLocalIterator
on both RDDs and then do a merge manually. If the result has to be an RDD, one could do this inside the compute
method of a custom RDD type, processing the input RDDs and generating the outputs.
When the inputs are large and thus consist of many partitions, things get even trickier. In this case, you probably want multiple partitions in the output RDD as well. You could use the custom RDD mentioned earlier, but create multiple partitions (using a RangePartitioner
). Each partition would cover a distinct range of elements (in the optimal case, these ranges would cover roughly equally sized parts of the output).
The tricky part with this is avoiding to process the complete input RDDs multiple times inside compute
. This can be avoided efficiently using filterByRange
from OrderedRDDFunctions
when the input RDDs are using a RangePartitioner
. When they are not using a RangePartitioner
, but you know that partitions are ordered internally and also have a global order, you would first need to find out the effective ranges covered by these partitions by actually probing into the data.
As the multiple partition case is rather complex, I would check whether the custom-made sort is really faster than simply using sortBy
or sortByKey
. The logic for sortBy
and sortByKey
is highly optimized regarding the shuffling process (transferring data between nodes). For this reason, it might well be that for many cases these methods are faster than the custom-made logic, even though the custom-made logic could be O(n) while sortBy
/ sortByKey
can be O(n log(n)) at best.
If you are interested in learning more about the shuffling logic used by Apache Spark, there is an article explaining the basic concept.
这篇关于如何在Spark中合并两个预排序的rdds?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!