星火二级聚集 [英] Spark two level aggregation

查看:162
本文介绍了星火二级聚集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的RDD包含3个值[组,用户字节。
我的要求是聚集对用户占用的字节和每个组的总字节数得到最高N个用户。

My RDD contains three values [Group, User, Bytes]. My requirement is to aggregate on Bytes consumed by user and get top N users by total bytes per group.

有关例如:输入数据,如:

For ex: with input data, as:

G1 U1 10
G1 U1 20
G1 U2 25
G1 U3 20
G2 U1 30
G2 U2 15
G2 U2 25
G2 U3 45

与TOP2查询应该返回:

Query with top2 should return:

G1 U1 30
G1 U2 25
G2 U3 45
G2 U2 40

到目前为止,我的code是如下:

So far my code is as follows:

rdd: RDD[(String, String), Double)
rdd.reduceByKey((x,y) => (x+y))
    .map {
       x => ((x._1._1), (x._1._2, x._2))
    }.sortBy(x => x._2._2, false)

我还没有由GROUP值弄清楚如何组,然后只需要TOPN结果。任何人都可以进一步帮助或有解决我要求一个更好的办法?

I am yet to figure out how to group by GROUP value and then only take topN results. Can anyone help further or if there is a better way of solving my requirement?

推荐答案

从你的问题看来你是试图让每个组的秩(SQL)。

From your questions it seems you are trying to get the rank(SQL) for each group.

因此​​,这里是我送给你的question.It解决方案可能不是最有效的,但它的工作原理

So here is my solution for your question.It might not be the most efficient but it works

val rddsum = rdd.reduceByKey((x,y) => (x+y)).map(x => (x._1._1,x._1._2,x._2))

像以前一样给结果,

gives result as before,

(G1, U1, 30)
(G1, U2, 25)
(G1, U3, 20)
(G2, U1, 30)
(G2, U2, 40)
(G2, U3, 45)

现在,GROUPBY 1关口,并与排名mapValues​​

now, groupby 1st col and mapValues with rank

val grpd = rddsum.groupBy{x => x._1}

val sortAndRankedItems = grpd.mapValues{ it => it.toList.sortBy{x => x._3}.zip(Stream from 1) } 

现在sortAndRankedItems将类型的数组[(字符串,列表[((字符串,字符串,字符串),智力)])]
因此采取其通过flatmapping是感兴趣只有第二元件,在这种情况下,它是2过滤TOPN元素,则考虑第1元件只有I,E元组以得出答案。

Now sortAndRankedItems will be of type Array[(String, List[((String, String, String), Int)])] Hence take only second element which is of interest by flatmapping, filter topN elements in this case it is 2, then consider 1st element only i,e tuple to arrive at the answer.

val result = sortAndRankedItems.flatMap{case(m,n) => n}.filter{x => x._2 <= 2}.map{case(x,y) => x}

希望它可以帮助!

Hope it helps!!

这篇关于星火二级聚集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆