优化必须对每个条目相似度进行计算并为每个相似度输出前N个相似项的Spark作业 [英] Optimize Spark job that has to calculate each to each entry similarity and output top N similar items for each

查看:119
本文介绍了优化必须对每个条目相似度进行计算并为每个相似度输出前N个相似项的Spark作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Spark作业,需要计算基于电影内容的相似度.有46,000部电影.每个电影都由一组稀疏向量表示(每个向量是电影中一个字段(例如标题,情节,流派,演员等)的特征向量).例如,对于演员和流派,矢量显示电影中存在一个给定的演员(1)或不存在(0).

I have a Spark job that needs to compute movie content-based similarities. There are 46k movies. Each movie is represented by a set of SparseVectors (each vector is a feature vector for one of the movie's fields such as Title, Plot, Genres, Actors, etc.). For Actors and Genres, for example, the vector shows whether a given actor is present (1) or absent (0) in the movie.

任务是为每部电影查找排名前10的相似电影. 我设法在Scala中编写了一个脚本来执行所有这些计算并完成工作.它适用于较小的电影集,例如1000部电影,但不适用于整个数据集(内存不足等).

The task is to find top 10 similar movies for each movie. I managed to write a script in Scala that performs all those computations and does the job. It works for smaller sets of movies such as 1000 movies but not for the whole dataset (out of memory, etc.).

我执行此计算的方法是在电影数据集上使用交叉联接.然后通过仅采用movie1_id<的行来减少问题. movie2_id. 此时数据集仍将包含46000 ^ 2/2行,即1058000000. 而且每一行都有大量数据.

The way I do this computation is by using a cross join on the movies dataset. Then reduce the problem by only taking rows where movie1_id < movie2_id. Still the dataset at this point will contain 46000^2/2 rows which is 1058000000. And each row has significant amount of data.

然后,我为每一行计算相似性得分.计算相似度后,我将movie1_id相同的结果分组,并使用Window函数将前N个项按相似度得分按降序排列(类似于此处的描述:

Then I calculate similarity score for each row. After similarity is calculated I group the results where movie1_id is same and sort them in descending order by similarity score using a Window function taking top N items (similar to how it's described here: Spark get top N highest score results for each (item1, item2, score)).

问题是-是否可以在Spark中更有效地完成?例如.无需执行crossJoin?

The question is - can it be done more efficiently in Spark? E.g. without having to perform a crossJoin?

另一个问题-Spark如何处理如此巨大的数据框(由多个SparseVector组成的1058000000行)?是否必须一次将所有这些都保留在内存中?还是以某种方式逐个处理此类数据帧?

And another question - how does Spark deal with such huge Dataframes (1058000000 rows consisting of multiple SparseVectors)? Does it have to keep all this in memory at a time? Or does it process such dataframes piece by piece somehow?

我正在使用以下函数来计算电影矢量之间的相似度:

I'm using the following function to calculate similarity between movie vectors:

def intersectionCosine(movie1Vec: SparseVector, movie2Vec: SparseVector): Double = {
val a: BSV[Double] = toBreeze(movie1Vec)
val b: BSV[Double] = toBreeze(movie2Vec)

var dot: Double = 0
var offset: Int = 0
while( offset < a.activeSize) {
  val index: Int = a.indexAt(offset)
  val value: Double = a.valueAt(offset)

  dot += value * b(index)
  offset += 1
}

val bReduced: BSV[Double] = new BSV(a.index, a.index.map(i => b(i)), a.index.length)
val maga: Double = magnitude(a)
val magb: Double = magnitude(bReduced)

if (maga == 0 || magb == 0)
  return 0
else
  return dot / (maga * magb)
}


数据框中的每一行都包含两个连接的类:


Each row in the Dataframe consists of two joined classes:

final case class MovieVecData(imdbID: Int,
                          Title: SparseVector,
                          Decade: SparseVector,
                          Plot: SparseVector,
                          Genres: SparseVector,
                          Actors: SparseVector,
                          Countries: SparseVector,
                          Writers: SparseVector,
                          Directors: SparseVector,
                          Productions: SparseVector,
                          Rating: Double
                         )

推荐答案

只要您对近似值没问题,并且不需要精确的结果(或精确的数字或​​结果),则可以更高效地完成此操作.

It can be done more efficiently, as long as you are fine with approximations, and don't require exact results (or exact number or results).

>

类似于我对在Apache Spark中进行高效字符串匹配的答案,您可以将LSH与以下命令配合使用:

Similarly to my answer to Efficient string matching in Apache Spark you can use LSH, with:

如果特征空间很小(或可以合理减少)并且每个类别都相对较小,那么您还可以手动优化代码:

If feature space is small (or can be reasonably reduced) and each category is relatively small you can also optimize your code by hand:

  • explode功能数组可从单个记录生成#features记录.
  • 按特征的自连接结果,计算距离并筛选出候选者(每对记录将在且仅当它们共享特定分类特征时才进行比较).
  • 使用您当前的代码记录最重要的记录.
  • explode feature array to generate #features records from a single record.
  • Self join result by feature, compute distance and filter out candidates (each pair of records will be compared if and only if they share specific categorical feature).
  • Take top records using your current code.

一个最小的例子是(将其视为伪代码):

A minimal example would be (consider it to be a pseudocode):

import org.apache.spark.ml.linalg._

// This is oversimplified. In practice don't assume only sparse scenario
val indices = udf((v: SparseVector) => v.indices)

val df = Seq(
  (1L, Vectors.sparse(1024, Array(1, 3, 5), Array(1.0, 1.0, 1.0))),
  (2L, Vectors.sparse(1024, Array(3, 8, 12), Array(1.0, 1.0, 1.0))),
  (3L, Vectors.sparse(1024, Array(3, 5), Array(1.0, 1.0))),
  (4L, Vectors.sparse(1024, Array(11, 21), Array(1.0, 1.0))),
  (5L, Vectors.sparse(1024, Array(21, 32), Array(1.0, 1.0)))
).toDF("id", "features")

val possibleMatches = df
  .withColumn("key", explode(indices($"features")))
  .transform(df => df.alias("left").join(df.alias("right"), Seq("key")))

val closeEnough(threshold: Double) = udf((v1: SparseVector, v2: SparseVector) =>  intersectionCosine(v1, v2) > threshold)

possilbeMatches.filter(closeEnough($"left.features", $"right.features")).select($"left.id", $"right.id").distinct

请注意,只有在哈希/功能具有足够的选择性(并且是最佳稀疏性)的情况下,这两种解决方案才值得开销.在上面显示的示例中,您仅比较集合{1,2​​,3}和{4,5}中的行,而不比较集合之间的行.

Note that both solutions are worth the overhead only if hashing / features are selective enough (and optimally sparse). In the example shown above you'd compare only rows inside set {1, 2, 3} and {4, 5}, never between sets.

但是在最坏的情况下(M个记录,N个特征),我们可以进行 NM 2 的比较,而不是 M 2

However in the worst case scenario (M records, N features) we can make N M2 comparisons, instead of M2

这篇关于优化必须对每个条目相似度进行计算并为每个相似度输出前N个相似项的Spark作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆