从PySpark RDD中的每个组中获取前N个元素(不使用groupByKey) [英] Take top N elements from each group in PySpark RDD (without using groupByKey)
本文介绍了从PySpark RDD中的每个组中获取前N个元素(不使用groupByKey)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有如下所示的RDD
I have an RDD like the following
dataSource = sc.parallelize( [("user1", (3, "blue")), ("user1", (4, "black")), ("user2", (5, "white"), ("user2", (3, "black")), ("user2", (6, "red")), ("user1", (1, "red"))] )
我想使用 reduceByKey
查找每个用户的前2种颜色,因此输出将是RDD,如:
I want to use reduceByKey
to find Top 2 colors for each user so the output would be an RDD like:
sc.parallelize([("user1", ["black", "blue"]), ("user2", ["red", "white"])])
所以我需要按键进行归约,然后将每个键的值(即(数字,颜色)按数字排序)并返回前n种颜色.
so I need to reduce by key and then sort each key's values, i.e. (number, color) on number and return top n colors.
我不想使用 groupBy
.如果除了 groupBy
之外,还有比 reduceByKey
更好的东西,那就太好了:)
I don't want to use groupBy
. If there is anything better than reduceByKey
other than groupBy
, it would be great :)
推荐答案
You can for example use a heap queue. Required imports:
import heapq
from functools import partial
助手功能:
def zero_value(n):
"""Initialize a queue. If n is large
it could be more efficient to track a number of the elements
on heap (cnt, heap) and switch between heappush and heappushpop
if we exceed n. I leave this as an exercise for the reader."""
return [(float("-inf"), None) for _ in range(n)]
def seq_func(acc, x):
heapq.heappushpop(acc, x)
return acc
def merge_func(acc1, acc2, n):
return heapq.nlargest(n, heapq.merge(acc1, acc2))
def finalize(kvs):
return [v for (k, v) in kvs if k != float("-inf")]
数据:
rdd = sc.parallelize([
("user1", (3, "blue")), ("user1", (4, "black")),
("user2", (5, "white")), ("user2", (3, "black")),
("user2", (6, "red")), ("user1", (1, "red"))])
解决方案:
(rdd
.aggregateByKey(zero_value(2), seq_func, partial(merge_func, n=2))
.mapValues(finalize)
.collect())
结果:
[('user2', ['red', 'white']), ('user1', ['black', 'blue'])]
这篇关于从PySpark RDD中的每个组中获取前N个元素(不使用groupByKey)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文