如何使用 Scala 在 Spark 中声明一个稀疏向量? [英] How to declare a sparse Vector in Spark with Scala?

查看:50
本文介绍了如何使用 Scala 在 Spark 中声明一个稀疏向量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个稀疏向量(mllib.linalg.Vectors 类,不是默认的),但我不明白如何使用 Seq.我有一个包含三个数字/行的小型测试文件,我将其转换为 rdd,将文本分成双份,然后按第一列对行进行分组.

I'm trying to create a sparse Vector (the mllib.linalg.Vectors class, not the default one) but I can't understand how to use Seq. I have a small test file with three numbers/line, which I convert to an rdd, split the text in doubles and then group the lines by their first column.

测试文件

1 2 4
1 3 5    
1 4 8    
2 7 5    
2 8 4    
2 9 10

代码

val data = sc.textFile("/home/savvas/DWDM/test.txt")
val data2 = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
val grouped = data2.groupBy( _(0) )

这导致 grouped 具有这些值

(2.0,CompactBuffer([2.0,7.0,5.0], [2.0,8.0,4.0], [2.0,9.0,10.0]))
(1.0,CompactBuffer([1.0,2.0,4.0], [1.0,3.0,5.0], [1.0,4.0,8.0]))

但我似乎无法弄清楚下一步.我需要取每一行 grouped 并为它创建一个向量,以便新 RDD 的每一行都有一个向量,其中包含索引中 CompactBuffer 的第三个值由第二个值指定.简而言之,我的意思是我希望示例中的数据像这样.

But I can't seem to figure out the next step. I need to take each line of grouped and create a vector for it, so that each line of the new RDD has a vector with the third value of the CompactBuffer in the index specified by the second value. In short, what I mean is that I want my data in the example like this.

[0, 0, 0, 0, 0, 0, 5.0, 4.0, 10.0, 0]
[0, 4.0, 5.0, 8.0, 0, 0, 0, 0, 0, 0]

我知道我需要使用稀疏向量,并且可以通过三种方法来构造它.我已经尝试将 Seq 与 tuple2(index, value) 一起使用,但我无法理解如何创建这样的 Seq.

I know I need to use a sparse vector, and that there are three ways to construct it. I've tried using a Seq with a tuple2(index, value) , but I cannot understand how to create such a Seq.

推荐答案

一种可能的解决方案如下所示.首先让我们将数据转换为预期类型:

One possible solution is something like below. First lets convert data to expected types:

import org.apache.spark.rdd.RDD

val pairs: RDD[(Double, (Int, Double))] = data.map(_.split(" ") match {
  case Array(label, idx, value) => (label.toDouble, (idx.toInt, value.toDouble))
})

接下来找到最大索引(向量的大小):

next find a maximum index (size of the vectors):

val nCols = pairs.map{case (_, (i, _)) => i}.max + 1

分组和转换:

import org.apache.spark.mllib.linalg.SparseVector

def makeVector(xs: Iterable[(Int, Double)]) = {
  val (indices, values) = xs.toArray.sortBy(_._1).unzip
  new SparseVector(nCols, indices.toArray, values.toArray)
}

val transformed: RDD[(Double, SparseVector)] = pairs
  .groupByKey
  .mapValues(makeVector)

假设第一个元素可以安全地与整数相互转换,另一种处理方法是使用CoordinateMatrix:

Another way you can handle this, assuming that the first elements can be safely converted to and from integer, is to use CoordinateMatrix:

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = data.map(_.split(" ") match {
  case Array(label, idx, value) => 
    MatrixEntry(label.toInt, idx.toInt, value.toDouble)
})

val transformed: RDD[(Double, SparseVector)] = new CoordinateMatrix(entries)
  .toIndexedRowMatrix
  .rows
  .map(row => (row.index.toDouble, row.vector))

这篇关于如何使用 Scala 在 Spark 中声明一个稀疏向量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆