高效计算Spark中的前k个元素 [英] Efficiently calculate top-k elements in spark
本文介绍了高效计算Spark中的前k个元素的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一个类似的数据框:
I have a dataframe similarly to:
+---+-----+-----+
|key|thing|value|
+---+-----+-----+
| u1| foo| 1|
| u1| foo| 2|
| u1| bar| 10|
| u2| foo| 10|
| u2| foo| 2|
| u2| bar| 10|
+---+-----+-----+
并希望得到以下结果:
+---+-----+---------+----+
|key|thing|sum_value|rank|
+---+-----+---------+----+
| u1| bar| 10| 1|
| u1| foo| 3| 2|
| u2| foo| 12| 1|
| u2| bar| 10| 2|
+---+-----+---------+----+
当前,有类似的代码:
val df = Seq(("u1", "foo", 1), ("u1", "foo", 2), ("u1", "bar", 10), ("u2", "foo", 10), ("u2", "foo", 2), ("u2", "bar", 10)).toDF("key", "thing", "value")
// calculate sums per key and thing
val aggregated = df.groupBy("key", "thing").agg(sum("value").alias("sum_value"))
// get topk items per key
val k = lit(10)
val topk = aggregated.withColumn("rank", rank over Window.partitionBy("key").orderBy(desc("sum_value"))).filter('rank < k)
但是,此代码非常效率低.窗口功能会生成商品的总订单,并导致随机排序.
However, this code is very inefficient. A window function generates a total order of items and causes a gigantic shuffle.
我如何才能更有效地计算前k个项目? 也许使用近似函数,即类似于 https://datasketches.github.io/或 https://spark.apache.org/docs/latest/ml-frequent- pattern-mining.html
How can I calculate top-k items more efficiently? Maybe using approximate functions i.e. sketches similarly to https://datasketches.github.io/ or https://spark.apache.org/docs/latest/ml-frequent-pattern-mining.html
推荐答案
这是推荐系统的经典算法.
This is a classical algorithm of recommender systems.
case class Rating(thing: String, value: Int) extends Ordered[Rating] {
def compare(that: Rating): Int = -this.value.compare(that.value)
}
case class Recommendation(key: Int, ratings: Seq[Rating]) {
def keep(n: Int) = this.copy(ratings = ratings.sorted.take(n))
}
val TOPK = 10
df.groupBy('key)
.agg(collect_list(struct('thing, 'value)) as "ratings")
.as[Recommendation]
.map(_.keep(TOPK))
您还可以在以下位置查看源代码:
You can also check the source code at:
- Spotify大数据Rosetta代码/
TopByKeyAggregator.scala
,被认为是使用推荐算法时的最佳实践,尽管看起来他们的示例仍然使用RDD
.
- Spotify Big Data Rosetta Code /
TopItemsPerUser.scala
, several solutions here for Spark or Scio - Spark MLLib /
TopByKeyAggregator.scala
, considered the best practice when using their recommendation algorithm, it looks like their examples still usesRDD
though.
import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
sc.parallelize(Array(("u1", ("foo", 1)), ("u1", ("foo", 2)), ("u1", ("bar", 10)), ("u2", ("foo", 10)),
("u2", ("foo", 2)), ("u2", ("bar", 10))))
.topByKey(10)(Ordering.by(_._2))
这篇关于高效计算Spark中的前k个元素的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文