如何将RDD分成两个或更多RDDS? [英] How to split a RDD into two or more RDDs?

查看:859
本文介绍了如何将RDD分成两个或更多RDDS?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在寻找一种将RDD分成两个或更多的RDDS。我见过的关闭是这样的<一个href=\"http://stackoverflow.com/questions/27231524/scala-spark-split-collection-into-several-rdd\">Scala星火:?拆分收集成几个RDD 这仍然是一个单一的RDD

I'm looking for a way to split an RDD into two or more RDDs. The closes I've seen is this Scala Spark: Split collection into several RDD? which is still a single RDD.

如果你熟悉SAS,像这样

If you're familiar with SAS, something like this

data work.split1, work.split2;
    set work.preSplit;

    if( condition1 )
        output work.split1
    else if( condition2 ) 
        output work.split2
run;

这导致两个不同的数据集。它必须立即坚持让我准备的结果...

which resulted in two distinct data sets. It would have to be immediately persisted to get the results I intend...

推荐答案

一种方法是使用自定义分区分区根据您的过滤条件的数据。这可以通过扩展分区和实施类似的东西来实现的 RangePartitioner

One way is to use a custom partitioner to partition the data depending upon your filter condition. This can be achieved by extending Partitioner and implementing something similar to the RangePartitioner.

一个地图分区然后可以用于构建从分区RDD多RDDS没有读取所有数据。

A map partitions can then be used to construct multiple RDDs from the partitioned RDD without reading all the data.

val filtered = partitioned.mapPartitions { iter => {

  new Iterator[Int](){
    override def hasNext: Boolean = {
      if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) {
        false
      } else {
        iter.hasNext
      }
    }

    override def next():Int = iter.next()
  }

要知道,在过滤RDDS分区的数量将是相同的,以便一聚结应该用来减少这种向下并取出空分区中的分区RDD的数目。

Just be aware that the number of partitions in the filtered RDDs will be the same as the number in the partitioned RDD so a coalesce should be used to reduce this down and remove the empty partitions.

这篇关于如何将RDD分成两个或更多RDDS?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆