Spark中的加权平均值 [英] Weighted Average in Spark
问题描述
我有两个RDD,第一个我将呼叫 userVisits
,如下所示:
I have two RDDs, the first I'll call userVisits
that looks like this:
((123, someurl,Mon Nov 04 00:00:00 PST 2013),11.0)
第二个是全部访问次数:
and the second is allVisits:
((someurl,Mon Nov 04 00:00:00 PST 2013),1122.0)
我可以执行 userVisits.reduceByKey(_ + _)
可以获取该用户的访问次数.我可以进行allVisits并获得相同的结果.我想做的是获得用户的加权平均值,除以用户访问次数除以当天的总访问次数.我需要使用用户访问中的部分键元组在allVisits中查找一个值.我猜可以用这样的地图来做到这一点:
I can do userVisits.reduceByKey(_+_)
can get the number of visits by that user. I can do allVisits and get the same. What I want to do is get a weighted average for the users dividing the users visits by the total visits for the day. I need to lookup a value in allVisits with part of the key tuple in user visits. I'm guessing it could be done with a map like this:
userVisits.reduceByKey(_+_).map( item => item._2 / allVisits.get(item._1))
我知道 allVisits.get(key)不存在,但是我该怎么做呢?
I know allVisits.get(key) doesn't exist, but how could I accomplish something like that?
另一种方法是从allVisits获取密钥,并从userVisits映射每个密钥数目,然后将两者结合,但这似乎效率很低.
The alternative is getting the keys from allVisits and mapping each number of keys from userVisits then joining the two, but that seems inefficient.
推荐答案
我在这里看到的唯一通用选项是 join
:
The only universal option I see here is join
:
val userVisitsAgg = userVisits.reduceByKey(_ + _)
val allVisitsAgg = allVisits.reduceByKey(_ + _)
userVisitsAgg.map{case ((id, url, date), sum) => ((url, date), (id, sum))}
.join(allVisitsAgg)
.map{case ((url, date), ((id, userSum), (urlSum))) =>
((id, url, date), userSum / urlSum)}
如果 allVisitsAgg
足够小,可以广播,您可以在上面简化为类似以下内容:
If allVisitsAgg
is small enough to be broadcasted you can simplify above to something like this:
val allVisitsAggBD = sc.broadcast(allVisitsAgg.collectAsMap)
userVisitsAgg.map{case ((id, url, date), sum) =>
((id, url), sum / allVisitsAggBD.value((url, date)))
}
这篇关于Spark中的加权平均值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!