如何根据给定分区过滤RDD? [英] How to filter RDDs based on a given partition?

查看:69
本文介绍了如何根据给定分区过滤RDD?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

请考虑以下示例:

JavaPairRDD<String, Row> R = input.textFile("test").mapToPair(new PairFunction<String, String, Row>() {
        public Tuple2<String, Row> call(String arg0) throws Exception {
            String[] parts = arg0.split(" ");
            Row r = RowFactory.create(parts[0],parts[1]);
            return new Tuple2<String, Row>(r.get(0).toString(), r);
        }}).partitionBy(new HashPartitioner(20));

上面的代码创建了一个名为 R 的RDD,该哈希通过在名为"test"的txt文件的第一列上进行哈希处理而分为20个部分.

The code above creates an RDD named R which is partitioned in 20 pieces by hashing on the first column of a txt file named "test".

考虑 test.txt 文件的格式如下:

...
valueA1 valueB1
valueA1 valueB2
valueA1 valueB3
valueA1 valueB4
... 

在我的上下文中,我有一个已知值,例如valueA1,我想检索所有其他值.通过使用具有指定值的现有过滤器操作来做到这一点很简单.但是,我想避免这种情况,因为从本质上讲,过滤操作将在整个RDD上执行.

In my context, I have a known value e.g., valueA1 and I want to retrieve all the other values. It is trivial to do it by using the existing filter operation with the specified value. However, I would like to avoid this since essentially the filter operation will be performed on the whole RDD.

假设hash(valueA1)= 3,我只想对分区3执行给定的操作.更一般而言,我对从RDD中删除/选择特定分区并对它们执行操作感兴趣.

Assume that the hash(valueA1)=3, I would like to perform a given operation only on partition 3. More generally, I am interested in dropping/selecting specific partitions from an RDD and perform operations on them.

从SPARK API看来,不可能直接解决这一问题吗?

From the SPARK API it seems that it is not possible directly is there a workaround to achieve the same thing?

推荐答案

对于单个键,您可以使用 lookup 方法:

For single keys you can use lookup method:

rdd.lookup("a")

// Seq[Int] = ArrayBuffer(1, 4)

为了进行有效的查找,您将需要对RDD进行分区,例如,如下所示使用 HashPartitioner .

For an efficient lookup you'll need a RDD which is partitioned, for example using HashPartitioner as below.

如果您只想过滤包含特定键的分区,可以使用 mapPartitionsWithIndex :

If you want to simply filter partitions containing specific keys it can be done with mapPartitionsWithIndex:

import org.apache.spark.HashPartitioner

val rdd = sc.parallelize(
  Seq(("a", 1), ("b", 2), ("c", 3), ("a", 4), ("b", 5)
// A particular number is used only to get a reproducible output
)).partitionBy(new HashPartitioner(8))  

val keys = Set("a", "c")
val parts = keys.map(_.## % rdd.partitions.size)

rdd.mapPartitionsWithIndex((i, iter) =>
  if (parts.contains(i)) iter.filter{ case (k, _) => keys.contains(k) }
  else Iterator()
).collect

// Array[(String, Int)] = Array((a,1), (a,4), (c,3))

这篇关于如何根据给定分区过滤RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆