星火:TOPN后GROUPBY [英] Spark: TOPN after GroupBy
问题描述
我有一个RDD P 映射到类:
I have an RDD P mapped to the class:
case class MyRating(userId:Int, itemId:Int, rating:Double)
我感兴趣的是找出每个用户即GROUPBY的用户id 并各自形成组中,筛选出的TopN(如10)的基础上最高的评级条目的TopN条目。
I am interested in finding TopN entries for each User i.e. GroupBy userId and within each formed group, filter out the TopN (say 10) entries based on highest rating.
我做了以下内容:
val A : RDD[((Int), Iterable[MyRating])] = P.keyBy(r => (r.userId)).groupByKey
val B : RDD[((Int), List[MyRating])] = key.mapValues(iter => iter.toList.sortBy(_.rating, false))
val C = values.groupByKey.take(10)
显然应用。取(10)groupByKey给我留下了只有10个键(用户)后,不会过滤出TOP10评级为每个用户。
Clearly applying .take(10) after groupByKey leaves me with only 10 keys(Users) and will not filter out the top10 ratings for each User.
我们怎样去一个GROUPBY后应用。取(N),使其作用于价值而非密钥本身某些部分?
How do we go about applying .take(N) after a groupBy so that it acts on some part of value instead of key itself?
推荐答案
一个幼稚的做法是采取的 N 的值:
A naive approach is to take n values:
B.mapValues(_.take(n))
但如果你需要的值只有小部分会更好地使用例如 aggregateByKey
和运行,而不是分组丢弃一切过时的记录。你可能希望要的东西在实践中更有效的(你可以检查顶星火实施
/ takeOrdered
),但你可以开始像这样的东西:
but if you need only small subset of values it would be better to use for example aggregateByKey
and drop obsolete records on the run instead of grouping everything. You probable want want something more efficient in practice (you can check Spark implementation of top
/ takeOrdered
) but you can start with something like this:
import scala.math.Ordering
import scala.collection.mutable.PriorityQueue
implicit val ord = Ordering.by[MyRating, Double](_.rating)
val pairs = rdd.keyBy(_.userId)
pairs.aggregateByKey(new scala.collection.mutable.PriorityQueue[MyRating]())(
(acc, x) => {
acc.enqueue(x)
acc.take(n)
},
(acc1, acc2) => (acc1 ++ acc2).take(n)
)
注意,上面的片段需要斯卡拉2.11+由于 SI-7568 。
这篇关于星火:TOPN后GROUPBY的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!