如何在Spark中合并两个预排序的rdds? [英] How to merge two presorted rdds in spark?

查看:248
本文介绍了如何在Spark中合并两个预排序的rdds?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个大的csv文件,这些文件按其中一列进行了预排序.有没有一种方法可以利用它们已经被排序的事实,从而更快地获得新的已排序RDD,而无需再次进行完全排序?

解决方案

简短的答案:不,当使用Apache Spark提供的排序工具时,没有办法利用已经对两个输入RDD进行排序的事实.

答案很长:在某些情况下,可能有比使用sortBysortByKey更好的方法.

最明显的情况是输入的RDD已排序并代表不同的范围.在这种情况下,假设rdd1中的所有元素都排在rdd2中的所有元素之前(根据选择的顺序),简单地使用rdd1.union(rdd2)是组合输入RDD的最快方法(几乎为零成本).

当输入RDD的范围重叠时,事情变得更加棘手.假设目标RDD仅具有一个分区,则在两个RDD上使用toLocalIterator然后手动进行合并可能会比较有效.如果结果必须是RDD,则可以在自定义RDD类型的compute方法中执行此操作,处理输入的RDD并生成输出.

当输入很大并因此由许多分区组成时,事情就变得更加棘手.在这种情况下,您可能还希望在输出RDD中包含多个分区.您可以使用前面提到的自定义RDD,但可以创建多个分区(使用RangePartitioner).每个分区将覆盖一个不同的元素范围(在最佳情况下,这些范围将覆盖输出的大小大致相等的部分).

棘手的部分是避免在compute中多次处理完整的输入RDD.当输入RDD使用RangePartitioner时,可以使用OrderedRDDFunctions中的filterByRange有效地避免这种情况.当他们不使用RangePartitioner时,但是您知道分区是内部排序的并且具有全局顺序,则首先需要通过实际探查数据来找出这些分区所覆盖的有效范围.

由于多分区情况相当复杂,因此我将检查定制排序是否真的比仅使用sortBysortByKey快. sortBysortByKey的逻辑在改组过程(在节点之间传输数据)方面进行了高度优化.因此,在许多情况下,即使定制逻辑可以是O(n),而sortBy/sortByKey可以是O(n log (n)).

如果您想了解有关Apache Spark使用的改组逻辑的更多信息,请访问 解决方案

The short answer: No, there is no way to leverage the fact that two input RDDs are already sorted when using the sort facilities offered by Apache Spark.

The long answer: Under certain conditions, there might be a better way than using sortBy or sortByKey.

The most obvious case is when the input RDDs are already sorted and represent distinct ranges. In this case, simply using rdd1.union(rdd2) is the fastest (virtually zero cost) way for combining the input RDDs, assuming that all elements in rdd1 come before all elements in rdd2 (according to the chosen ordering).

When the ranges of the input RDDs overlap, things get more tricky. Assuming that the target RDD shall only have a single partition, it might be efficient to use toLocalIterator on both RDDs and then do a merge manually. If the result has to be an RDD, one could do this inside the compute method of a custom RDD type, processing the input RDDs and generating the outputs.

When the inputs are large and thus consist of many partitions, things get even trickier. In this case, you probably want multiple partitions in the output RDD as well. You could use the custom RDD mentioned earlier, but create multiple partitions (using a RangePartitioner). Each partition would cover a distinct range of elements (in the optimal case, these ranges would cover roughly equally sized parts of the output).

The tricky part with this is avoiding to process the complete input RDDs multiple times inside compute. This can be avoided efficiently using filterByRange from OrderedRDDFunctions when the input RDDs are using a RangePartitioner. When they are not using a RangePartitioner, but you know that partitions are ordered internally and also have a global order, you would first need to find out the effective ranges covered by these partitions by actually probing into the data.

As the multiple partition case is rather complex, I would check whether the custom-made sort is really faster than simply using sortBy or sortByKey. The logic for sortBy and sortByKey is highly optimized regarding the shuffling process (transferring data between nodes). For this reason, it might well be that for many cases these methods are faster than the custom-made logic, even though the custom-made logic could be O(n) while sortBy / sortByKey can be O(n log(n)) at best.

If you are interested in learning more about the shuffling logic used by Apache Spark, there is an article explaining the basic concept.

这篇关于如何在Spark中合并两个预排序的rdds?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆