如何选择一个范围星火RDD元素? [英] How do I select a range of elements in Spark RDD?

查看:114
本文介绍了如何选择一个范围星火RDD元素?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想选择一个范围在Spark RDD元素。例如,我有一百个元素的RDD,我需要选择怎样做到这一点的元素,从60至80?

I'd like to select a range of elements in a Spark RDD. For example, I have an RDD with a hundred elements, and I need to select elements from 60 to 80. How do I do that?

我看到RDD有拿(I:int)方法,它返回第i个元素。但拿我终于元素或要素我从中间开始在一定的指数没有相应的方法。

I see that RDD has a take(i: int) method, which returns the first i elements. But there is no corresponding method to take the last i elements, or i elements from the middle starting at a certain index.

推荐答案

我不认为这是一种有效的方法来做到这一点呢。但最简单的方法是使用过滤器(),可以说你有一个RDD,与键值对,你只需要60至80个,包括元素只是做。

I don't think there is an efficient method to do this yet. But the easy way is using filter(), lets say you have an RDD, pairs with key value pairs and you only want elements from 60 to 80 inclusive just do.

val 60to80 = pairs.filter {
    _ match {
        case (k,v) => k >= 60 && k <= 80
        case _ => false //incase of invalid input
    }
}

我认为这是可能的,这可以在未来更有效地完成,通过使用 sortByKey 和节电约映射到每个分区的值的范围的信息。请记住,这做法只会节省什么,如果你正在计划多次查询范围,因为那种显然是昂贵的。

I think it's possible that this could be done more efficiently in the future, by using sortByKey and saving information about the range of values mapped to each partition. Keep in mind this approach would only save anything if you were planning to query the range multiple times because the sort is obviously expensive.

从看火花源那肯定可以使用做有效的范围查询 RangePartitioner

From looking at the spark source it would definitely be possible to do efficient range queries using RangePartitioner:

// An array of upper bounds for the first (partitions - 1) partitions
  private val rangeBounds: Array[K] = {

这是 RangePartitioner 与分区的所有上限的知识私有成员,这将是很容易只查询必要的分区。看起来这是一件火花用户可能会在未来看到:<一href=\"https://issues.apache.org/jira/browse/SPARK-911?jql=project%20%3D%20SPARK%20AND%20text%20~%20range\"相对=nofollow> SPARK-911

This is a private member of RangePartitioner with the knowledge of all the upper bounds of the partitions, it would be easy to only query the necessary partitions. It looks like this is something spark users may see in the future: SPARK-911

更新:更好的方式回答的基础上,要求拉我写了SPARK-911。如果RDD进行排序,将高效地运行,并且您多次查询。

UPDATE: Way better answer, based on pull request I'm writing for SPARK-911. It will run efficiently if the RDD is sorted and you query it multiple times.

val sorted = sc.parallelize((1 to 100).map(x => (x, x))).sortByKey().cache()
val p: RangePartitioner[Int, Int] = sorted.partitioner.get.asInstanceOf[RangePartitioner[Int, Int]];
val (lower, upper) = (10, 20)
val range = p.getPartition(lower) to p.getPartition(upper)
println(range)
val rangeFilter = (i: Int, iter: Iterator[(Int, Int)]) => {
  if (range.contains(i))
    for ((k, v) <- iter if k >= lower && k <= upper) yield (k, v)
  else
    Iterator.empty
}
for((k,v) <- sorted.mapPartitionsWithIndex(rangeFilter, preservesPartitioning = true).collect()) println(s"$k, $v")

如果有在内存中的整个分区是可以接受的,你甚至可以做这样的事情。结果
VAL glommedAndCached = sorted.glom()高速缓存();
glommedAndCached.map(A =&GT; a.slice(a.search(下部),a.search(上)+1))收集()

If having the whole partition in memory is acceptable you could even do something like this.
val glommedAndCached = sorted.glom()cache(); glommedAndCached.map(a => a.slice(a.search(lower),a.search(upper)+1)).collect()

搜索是不是一个成员BTW我只是做了一个具有二进制搜索功能,这里没有给出一个隐含的类

search is not a member BTW I just made an implicit class that has a binary search function, not shown here

这篇关于如何选择一个范围星火RDD元素?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆