如何选择一个范围星火RDD元素? [英] How do I select a range of elements in Spark RDD?
问题描述
我想选择一个范围在Spark RDD元素。例如,我有一百个元素的RDD,我需要选择怎样做到这一点的元素,从60至80?
I'd like to select a range of elements in a Spark RDD. For example, I have an RDD with a hundred elements, and I need to select elements from 60 to 80. How do I do that?
我看到RDD有拿(I:int)方法,它返回第i个元素。但拿我终于元素或要素我从中间开始在一定的指数没有相应的方法。
I see that RDD has a take(i: int) method, which returns the first i elements. But there is no corresponding method to take the last i elements, or i elements from the middle starting at a certain index.
推荐答案
我不认为这是一种有效的方法来做到这一点呢。但最简单的方法是使用过滤器()
,可以说你有一个RDD,对
与键值对,你只需要60至80个,包括元素只是做。
I don't think there is an efficient method to do this yet. But the easy way is using filter()
, lets say you have an RDD, pairs
with key value pairs and you only want elements from 60 to 80 inclusive just do.
val 60to80 = pairs.filter {
_ match {
case (k,v) => k >= 60 && k <= 80
case _ => false //incase of invalid input
}
}
我认为这是可能的,这可以在未来更有效地完成,通过使用 sortByKey
和节电约映射到每个分区的值的范围的信息。请记住,这做法只会节省什么,如果你正在计划多次查询范围,因为那种显然是昂贵的。
I think it's possible that this could be done more efficiently in the future, by using sortByKey
and saving information about the range of values mapped to each partition. Keep in mind this approach would only save anything if you were planning to query the range multiple times because the sort is obviously expensive.
从看火花源那肯定可以使用做有效的范围查询 RangePartitioner
:
From looking at the spark source it would definitely be possible to do efficient range queries using RangePartitioner
:
// An array of upper bounds for the first (partitions - 1) partitions
private val rangeBounds: Array[K] = {
这是 RangePartitioner
与分区的所有上限的知识私有成员,这将是很容易只查询必要的分区。看起来这是一件火花用户可能会在未来看到:<一href=\"https://issues.apache.org/jira/browse/SPARK-911?jql=project%20%3D%20SPARK%20AND%20text%20~%20range\"相对=nofollow> SPARK-911
This is a private member of RangePartitioner
with the knowledge of all the upper bounds of the partitions, it would be easy to only query the necessary partitions. It looks like this is something spark users may see in the future: SPARK-911
更新:更好的方式回答的基础上,要求拉我写了SPARK-911。如果RDD进行排序,将高效地运行,并且您多次查询。
UPDATE: Way better answer, based on pull request I'm writing for SPARK-911. It will run efficiently if the RDD is sorted and you query it multiple times.
val sorted = sc.parallelize((1 to 100).map(x => (x, x))).sortByKey().cache()
val p: RangePartitioner[Int, Int] = sorted.partitioner.get.asInstanceOf[RangePartitioner[Int, Int]];
val (lower, upper) = (10, 20)
val range = p.getPartition(lower) to p.getPartition(upper)
println(range)
val rangeFilter = (i: Int, iter: Iterator[(Int, Int)]) => {
if (range.contains(i))
for ((k, v) <- iter if k >= lower && k <= upper) yield (k, v)
else
Iterator.empty
}
for((k,v) <- sorted.mapPartitionsWithIndex(rangeFilter, preservesPartitioning = true).collect()) println(s"$k, $v")
如果有在内存中的整个分区是可以接受的,你甚至可以做这样的事情。结果 VAL glommedAndCached = sorted.glom()高速缓存();
glommedAndCached.map(A =&GT; a.slice(a.search(下部),a.search(上)+1))收集()
If having the whole partition in memory is acceptable you could even do something like this.
val glommedAndCached = sorted.glom()cache();
glommedAndCached.map(a => a.slice(a.search(lower),a.search(upper)+1)).collect()
搜索
是不是一个成员BTW我只是做了一个具有二进制搜索功能,这里没有给出一个隐含的类
search
is not a member BTW I just made an implicit class that has a binary search function, not shown here
这篇关于如何选择一个范围星火RDD元素?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!