Spark Scala余弦相似度矩阵 [英] Spark Scala Cosine Similarity Matrix

查看:87
本文介绍了Spark Scala余弦相似度矩阵的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

scala ( pyspark 家伙)的新手,并尝试计算行(项目)之间的余弦相似度

New to scala (pyspark guy) and trying to calculated cosine similarity between rows (items)

接着,以创建示例df为例:

Followed this to create a sample df as an example:

Spark,Scala,DataFrame:创建特征向量

import org.apache.spark.ml.feature.VectorAssembler

val df = sc.parallelize(Seq(
  (1, "cat1", 1), (1, "cat2", 3), (1, "cat9", 5), (2, "cat4", 6),
  (2, "cat9", 2), (2, "cat10", 1), (3, "cat1", 5), (3, "cat7", 16),
  (3, "cat8", 2))).toDF("userID", "category", "frequency")

// Create a sorted array of categories
val categories = df
  .select($"category")
  .distinct.map(_.getString(0))
  .collect
  .sorted

// Prepare vector assemble
val assembler =  new VectorAssembler()
  .setInputCols(categories)
  .setOutputCol("features")

// Aggregation expressions
val exprs = categories.map(
   c => sum(when($"category" === c, $"frequency").otherwise(lit(0))).alias(c))

val transformed = assembler.transform(
    df.groupBy($"userID").agg(exprs.head, exprs.tail: _*))
  .select($"userID", $"features")

transformed.show
+------+--------------------+
|userID|            features|
+------+--------------------+
|     1|(7,[0,2,6],[1.0,3...|
|     3|(7,[0,4,5],[5.0,1...|
|     2|(7,[1,3,6],[1.0,6...|
+------+--------------------+

尝试按照此帖子将df转换为 IndexedRowMatrix ,并在如何正确映射 rdd scala 语法上遇到麻烦

Trying to follow this post to convert the df into the IndexedRowMatrix and having trouble with the scala syntax on how to map the rdd properly

计算余弦相似度火花数据帧

import org.apache.spark.sql.Row

val irm = new IndexedRowMatrix(transformed.rdd.map {
  Row(_, v: org.apache.spark.ml.linalg.Vector) => 
    org.apache.spark.mllib.linalg.Vectors.fromML(v)
}.zipWithIndex.map { case (v, i) => IndexedRow(i, v) })



<console>:5: error: not a legal formal parameter.
Note: Tuples cannot be directly destructured in method or function parameters.
      Either create a single parameter accepting the Tuple1,
      or consider a pattern matching anonymous function: `{ case (param1, param1) => ... }
  Row(_, v: org.apache.spark.ml.linalg.Vector) =>
     ^

谢谢!

推荐答案

使用RowMatrix尝试一下:

Try this one, with RowMatrix :

def convertDataFrameToRowMatrix(df:DataFrame):RowMatrix = {
    val rows = df.count()
    val cols = df.columns.length
    val rdd:RDD[org.apache.spark.mllib.linalg.Vector] = df.rdd.map(
    row => org.apache.spark.mllib.linalg.Vectors.dense(row.getAs[Seq[Double]](1).toArray)
    val row = new IndexedRowMatrix(rdd,rows,cols)
    row
 }

并使用IndexedRowMatrix:

And with IndexedRowMatrix :

def convertDataFrameToIndexedMatrix(df:DataFrame):IndexedRowMatrix = {
    val rows:Long = df.count()
    val cols = df.columns.length
    val rdd = df.rdd.map(
    row => IndexedRow(rows, org.apache.spark.mllib.linalg.Vectors.dense(row.getAs[Seq[Double]](1).toArray)))
    val row = new IndexedRowMatrix(rdd,rows,cols)
    row
 }

如果要将IndexedRowMatrix或RowMatrix转换为RDD,则很简单:

If you want to convert an IndexedRowMatrix or RowMatrix to RDD, that's simple:

def convertIndexedRowMatrixToRDD(irm:IndexedRowMatrix):RDD[IndexedRow]=irm.rows

def convertRowMatrixToRDD(rm:RowMatrix):RDD[org.apache.spark.mllib.linalg.Vector] =rm.rows

如果要将其转换为DataFrame,请检查此链接.

If you want to convert it to DataFrame, check this link.

作为运行该功能的示例:

As an example of running the function:

val si = Seq((1,2), (3,4)) 
val myrdd:RDD[IndexedRow] = sc.parallelize(si).map(x => new IndexedRow(x._1.asInstanceOf[Long] ,Vectors.dense(x._1, x._2)))
val irm:IndexedRowMatrix = new IndexedRowMatrix(myrdd)
val r = convertIndexedRowMatrixToRDD(sc,irm)
val t = r.foreach(println)

输出:

IndexedRow(3,[3.0,4.0])
IndexedRow(1,[1.0,2.0])

这篇关于Spark Scala余弦相似度矩阵的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆