在火花LDA preparing数据 [英] Preparing data for LDA in spark
问题描述
我工作的实施星火LDA模型(通过斯卡拉API),和我有与我的数据所需的格式化步骤的麻烦。我的原始数据(存储在文本文件)是在以下格式,本质上是一个令牌和它们所对应的文件列表。一个简单的例子:
I'm working on implementing a Spark LDA model (via the Scala API), and am having trouble with the necessary formatting steps for my data. My raw data (stored in a text file) is in the following format, essentially a list of tokens and the documents they correspond to. A simplified example:
doc XXXXX term XXXXX
1 x 'a' x
1 x 'a' x
1 x 'b' x
2 x 'b' x
2 x 'd' x
...
凡XXXXX列是我不关心垃圾数据。我意识到这是存储语料库数据的非典型的方式,但它是我有什么。正如我希望从这个例子清楚,有每行的标记的原始数据(所以如果一个给定词出现在文档中的5倍,对应5行文字)。
Where the XXXXX columns are garbage data I don't care about. I realize this is an atypical way of storing corpus data, but it's what I have. As is I hope is clear from the example, there's one line per token in the raw data (so if a given term appears 5 times in a document, that corresponds to 5 lines of text).
在任何情况下,我需要格式化这个数据作为稀疏长期频率向量运行星火LDA模型,但我不熟悉斯卡拉因此遇到了一些麻烦。
In any case, I need to format this data as sparse term-frequency vectors for running a Spark LDA model, but am unfamiliar with Scala so having some trouble.
我开始:
import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
val corpus:RDD[Array[String]] = sc.textFile("path/to/data")
.map(_.split('\t')).map(x => Array(x(0),x(2)))
然后我得到的词汇数据,我需要生成稀疏向量:
And then I get the vocabulary data I'll need to generate the sparse vectors:
val vocab: RDD[String] = corpus.map(_(1)).distinct()
val vocabMap: Map[String, Int] = vocab.collect().zipWithIndex.toMap
我不知道是用在这里,这样我结束了对每个文件,然后我就可以送入LDA模型稀疏词频向量的正确映射功能。我想我需要这些方针的东西...
What I don't know is the proper mapping function to use here such that I end up with a sparse term frequency vector for each document that I can then feed into the LDA model. I think I need something along these lines...
val documents: RDD[(Long, Vector)] = corpus.groupBy(_(0)).zipWithIndex
.map(x =>(x._2,Vectors.sparse(vocabMap.size, ???)))
在这一点,我可以运行的实际LDA:
At which point I can run the actual LDA:
val lda = new LDA().setK(n_topics)
val ldaModel = lda.run(documents)
基本上,我不什么函数适用于每个组,这样我可以养活词频数据(presumably为地图
?)成疏向量。换句话说,我怎么填写 ???
在上面的code段,以达到预期的效果?
Basically, I don't what function to apply to each group so that I can feed term frequency data (presumably as a map
?) into a sparse vector. In other words, how do I fill in the ???
in the code snippet above to achieve the desired effect?
推荐答案
要处理这种方式一:
- 确保该
火花CSV
封装 -
将数据加载到数据框中,并选择感兴趣的列
- make sure that
spark-csv
package is available load data into DataFrame and select columns of interest
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true") // Optional, providing schema is prefered
.option("delimiter", "\t")
.load("foo.csv")
.select($"doc".cast("long").alias("doc"), $"term")
指数词
列:
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer()
.setInputCol("term")
.setOutputCol("termIndexed")
val indexed = indexer.fit(df)
.transform(df)
.drop("term")
.withColumn("termIndexed", $"termIndexed".cast("integer"))
.groupBy($"doc", $"termIndexed")
.agg(count(lit(1)).alias("cnt").cast("double"))
转换为 PairwiseRDD
import org.apache.spark.sql.Row
val pairs = indexed.map{case Row(doc: Long, term: Int, cnt: Double) =>
(doc, (term, cnt))}
由DOC组:
group by doc:
val docs = pairs.groupByKey
创建特征向量
create feature vectors
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.functions.max
val n = indexed.select(max($"termIndexed")).first.getInt(0) + 1
val docsWithFeatures = docs.mapValues(vs => Vectors.sparse(n, vs.toSeq))
现在你把所有你需要创建 LabeledPoints
或应用额外的处理
这篇关于在火花LDA preparing数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!