如何在Scala中使用Spark声明稀疏向量? [英] How to declare a sparse Vector in Spark with Scala?
问题描述
I'm trying to create a sparse Vector (the mllib.linalg.Vectors class, not the default one) but I can't understand how to use Seq. I have a small test file with three numbers/line, which I convert to an rdd, split the text in doubles and then group the lines by their first column.
测试文件
1 2 4
1 3 5
1 4 8
2 7 5
2 8 4
2 9 10
代码
val data = sc.textFile("/home/savvas/DWDM/test.txt")
val data2 = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
val grouped = data2.groupBy( _(0) )
这将导致grouped
具有这些值
(2.0,CompactBuffer([2.0,7.0,5.0], [2.0,8.0,4.0], [2.0,9.0,10.0]))
(1.0,CompactBuffer([1.0,2.0,4.0], [1.0,3.0,5.0], [1.0,4.0,8.0]))
但是我似乎无法弄清楚下一步.我需要获取grouped
的每一行并为其创建一个向量,以便新RDD的每一行都具有一个向量,该向量在第二个值指定的索引中具有CompactBuffer
的第三个值.简而言之,我的意思是我想要这样的示例中的数据.
But I can't seem to figure out the next step. I need to take each line of grouped
and create a vector for it, so that each line of the new RDD has a vector with the third value of the CompactBuffer
in the index specified by the second value. In short, what I mean is that I want my data in the example like this.
[0, 0, 0, 0, 0, 0, 5.0, 4.0, 10.0, 0]
[0, 4.0, 5.0, 8.0, 0, 0, 0, 0, 0, 0]
我知道我需要使用稀疏向量,并且有三种构造方法.我已经尝试过使用带有tuple2(index,value)的Seq,但是我不明白如何创建这样的Seq.
I know I need to use a sparse vector, and that there are three ways to construct it. I've tried using a Seq with a tuple2(index, value) , but I cannot understand how to create such a Seq.
推荐答案
一种可能的解决方案如下.首先,让我们将数据转换为预期的类型:
One possible solution is something like below. First lets convert data to expected types:
import org.apache.spark.rdd.RDD
val pairs: RDD[(Double, (Int, Double))] = data.map(_.split(" ") match {
case Array(label, idx, value) => (label.toDouble, (idx.toInt, value.toDouble))
})
下一步查找最大索引(向量的大小):
next find a maximum index (size of the vectors):
val nCols = pairs.map{case (_, (i, _)) => i}.max + 1
分组并转换:
import org.apache.spark.mllib.linalg.SparseVector
def makeVector(xs: Iterable[(Int, Double)]) = {
val (indices, values) = xs.toArray.sortBy(_._1).unzip
new SparseVector(nCols, indices.toArray, values.toArray)
}
val transformed: RDD[(Double, SparseVector)] = pairs
.groupByKey
.mapValues(makeVector)
您可以使用CoordinateMatrix
:
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = data.map(_.split(" ") match {
case Array(label, idx, value) =>
MatrixEntry(label.toInt, idx.toInt, value.toDouble)
})
val transformed: RDD[(Double, SparseVector)] = new CoordinateMatrix(entries)
.toIndexedRowMatrix
.rows
.map(row => (row.index.toDouble, row.vector))
这篇关于如何在Scala中使用Spark声明稀疏向量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!