从SPARK中的另一个RDD返回最大N个值的RDD [英] Return RDD of largest N values from another RDD in SPARK

查看:179
本文介绍了从SPARK中的另一个RDD返回最大N个值的RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试过滤元组的RDD以基于键值返回最大的N个元组.我需要返回格式为RDD.

I'm trying to filter an RDD of tuples to return the largest N tuples based on key values. I need the return format to be an RDD.

因此,RDD:

[(4, 'a'), (12, 'e'), (2, 'u'), (49, 'y'), (6, 'p')]

针对最大的3个键进行过滤的应返回RDD:

filtered for the largest 3 keys should return the RDD:

[(6,'p'), (12,'e'), (49,'y')]

先执行sortByKey()然后再执行take(N)会返回值,并且不会导致RDD,因此将无法正常工作.

Doing a sortByKey() and then take(N) returns the values and doesn't result in an RDD, so that won't work.

我可以返回所有键,对它们进行排序,找到第N个最大值,然后为更大的键值过滤RDD,但这似乎效率很低.

I could return all of the keys, sort them, find the Nth largest value, and then filter the RDD for key values greater than that, but that seems very inefficient.

做到这一点的最佳方法是什么?

What would be the best way to do this?

推荐答案

使用RDD

With RDD

一种快速但不是特别有效的解决方案是遵循sortByKey使用zipWithIndexfilter:

A quick but not particularly efficient solution is to follow sortByKey use zipWithIndex and filter:

n = 3
rdd = sc.parallelize([(4, 'a'), (12, 'e'), (2, 'u'), (49, 'y'), (6, 'p')])

rdd.sortByKey().zipWithIndex().filter(lambda xi: xi[1] < n).keys()

如果n与RDD大小相比相对较小,则更有效的方法是避免完全排序:

If n is relatively small compared to RDD size a little bit more efficient approach is to avoid full sort:

import heapq

def key(kv):
    return kv[0]

top_per_partition = rdd.mapPartitions(lambda iter: heapq.nlargest(n, iter, key))
top_per_partition.sortByKey().zipWithIndex().filter(lambda xi: xi[1] < n).keys()

如果键远小于值并且最终输出的顺序无关紧要,则filter方法可以正常工作:

If keys are much smaller than values and order of final output doesn't matter then filter approach can work just fine:

keys = rdd.keys()
identity = lambda x: x

offset = (keys
    .mapPartitions(lambda iter: heapq.nlargest(n, iter))
    .sortBy(identity)
    .zipWithIndex()
    .filter(lambda xi: xi[1] < n)
    .keys()
    .max())

rdd.filter(lambda kv: kv[0] <= offset)

如果出现平局,它也不会保留精确的n个值.

Also it won't keep exact n values in case of ties.

使用DataFrames

With DataFrames

您可以仅orderBylimit:

from pyspark.sql.functions import col

rdd.toDF().orderBy(col("_1").desc()).limit(n)

这篇关于从SPARK中的另一个RDD返回最大N个值的RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆