从SPARK中的另一个RDD返回最大N个值的RDD [英] Return RDD of largest N values from another RDD in SPARK
问题描述
我正在尝试过滤元组的RDD以基于键值返回最大的N个元组.我需要返回格式为RDD.
I'm trying to filter an RDD of tuples to return the largest N tuples based on key values. I need the return format to be an RDD.
因此,RDD:
[(4, 'a'), (12, 'e'), (2, 'u'), (49, 'y'), (6, 'p')]
针对最大的3个键进行过滤的应返回RDD:
filtered for the largest 3 keys should return the RDD:
[(6,'p'), (12,'e'), (49,'y')]
先执行sortByKey()
然后再执行take(N)
会返回值,并且不会导致RDD,因此将无法正常工作.
Doing a sortByKey()
and then take(N)
returns the values and doesn't result in an RDD, so that won't work.
我可以返回所有键,对它们进行排序,找到第N个最大值,然后为更大的键值过滤RDD,但这似乎效率很低.
I could return all of the keys, sort them, find the Nth largest value, and then filter the RDD for key values greater than that, but that seems very inefficient.
做到这一点的最佳方法是什么?
What would be the best way to do this?
推荐答案
使用RDD
With RDD
一种快速但不是特别有效的解决方案是遵循sortByKey
使用zipWithIndex
和filter
:
A quick but not particularly efficient solution is to follow sortByKey
use zipWithIndex
and filter
:
n = 3
rdd = sc.parallelize([(4, 'a'), (12, 'e'), (2, 'u'), (49, 'y'), (6, 'p')])
rdd.sortByKey().zipWithIndex().filter(lambda xi: xi[1] < n).keys()
如果n与RDD大小相比相对较小,则更有效的方法是避免完全排序:
If n is relatively small compared to RDD size a little bit more efficient approach is to avoid full sort:
import heapq
def key(kv):
return kv[0]
top_per_partition = rdd.mapPartitions(lambda iter: heapq.nlargest(n, iter, key))
top_per_partition.sortByKey().zipWithIndex().filter(lambda xi: xi[1] < n).keys()
如果键远小于值并且最终输出的顺序无关紧要,则filter
方法可以正常工作:
If keys are much smaller than values and order of final output doesn't matter then filter
approach can work just fine:
keys = rdd.keys()
identity = lambda x: x
offset = (keys
.mapPartitions(lambda iter: heapq.nlargest(n, iter))
.sortBy(identity)
.zipWithIndex()
.filter(lambda xi: xi[1] < n)
.keys()
.max())
rdd.filter(lambda kv: kv[0] <= offset)
如果出现平局,它也不会保留精确的n个值.
Also it won't keep exact n values in case of ties.
使用DataFrames
With DataFrames
您可以仅orderBy
和limit
:
from pyspark.sql.functions import col
rdd.toDF().orderBy(col("_1").desc()).limit(n)
这篇关于从SPARK中的另一个RDD返回最大N个值的RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!