在 spark 中为 LDA 准备数据 [英] Preparing data for LDA in spark
问题描述
我正在努力实现 Spark LDA 模型(通过 Scala API),但在为我的数据执行必要的格式化步骤时遇到了问题.我的原始数据(存储在文本文件中)采用以下格式,本质上是令牌列表及其对应的文档.一个简化的例子:
doc XXXXX 术语 XXXXX1 x 'a' x1 x 'a' x1 x 'b' x2 x 'b' x2 x 'd' x...
XXXXX 列是我不关心的垃圾数据.我意识到这是存储语料库数据的一种非典型方式,但这就是我所拥有的.正如我希望从示例中清楚地看到的那样,原始数据中每个 token 有一行(因此,如果给定的术语在文档中出现 5 次,则对应于 5 行文本).>
无论如何,我需要将这些数据格式化为稀疏词频向量以运行 Spark LDA 模型,但我不熟悉 Scala,因此遇到了一些麻烦.
我开始于:
import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel}导入 org.apache.spark.mllib.linalg.{Vector, Vectors}导入 org.apache.spark.rdd.RDDval 语料库:RDD[Array[String]] = sc.textFile("path/to/data").map(_.split('\t')).map(x => Array(x(0),x(2)))
然后我得到生成稀疏向量所需的词汇数据:
val vocab: RDD[String] = corpus.map(_(1)).distinct()val vocabMap: Map[String, Int] = vocab.collect().zipWithIndex.toMap
我不知道这里使用的正确映射函数,这样我最终会得到每个文档的稀疏词频向量,然后我可以将其输入 LDA 模型.我想我需要一些类似的东西...
val 文档:RDD[(Long, Vector)] = corpus.groupBy(_(0)).zipWithIndex.map(x =>(x._2,Vectors.sparse(vocabMap.size, ???)))
此时我可以运行实际的 LDA:
val lda = new LDA().setK(n_topics)val ldaModel = lda.run(文件)
基本上,我不知道将什么函数应用于每个组,以便我可以将词频数据(大概作为 map
?)输入稀疏向量.换句话说,如何填写上面代码片段中的???
才能达到想要的效果?
一种处理方法:
- 确保
spark-csv
包可用 将数据加载到 DataFrame 并选择感兴趣的列
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true")//可选,优先提供模式.option("分隔符", "\t").load("foo.csv").select($"doc".cast("long").alias("doc"), $"term")
index
term
列:import org.apache.spark.ml.feature.StringIndexerval indexer = new StringIndexer().setInputCol("术语").setOutputCol("termIndexed")val 索引 = indexer.fit(df).transform(df).drop("术语").withColumn("termIndexed", $"termIndexed".cast("integer")).groupBy($"doc", $"termIndexed").agg(count(lit(1)).alias("cnt").cast("double"))
转换为
PairwiseRDD
import org.apache.spark.sql.Rowval 对 = indexed.map{case Row(doc: Long, term: Int, cnt: Double) =>(doc, (term, cnt))}
按文档分组:
val docs =pairs.groupByKey
创建特征向量
import org.apache.spark.mllib.linalg.Vectors导入 org.apache.spark.sql.functions.maxval n = indexed.select(max($"termIndexed")).first.getInt(0) + 1val docsWithFeatures = docs.mapValues(vs => Vectors.sparse(n, vs.toSeq))
现在您拥有了创建
LabeledPoints
或应用额外处理所需的一切
I'm working on implementing a Spark LDA model (via the Scala API), and am having trouble with the necessary formatting steps for my data. My raw data (stored in a text file) is in the following format, essentially a list of tokens and the documents they correspond to. A simplified example:
doc XXXXX term XXXXX
1 x 'a' x
1 x 'a' x
1 x 'b' x
2 x 'b' x
2 x 'd' x
...
Where the XXXXX columns are garbage data I don't care about. I realize this is an atypical way of storing corpus data, but it's what I have. As is I hope is clear from the example, there's one line per token in the raw data (so if a given term appears 5 times in a document, that corresponds to 5 lines of text).
In any case, I need to format this data as sparse term-frequency vectors for running a Spark LDA model, but am unfamiliar with Scala so having some trouble.
I start with:
import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
val corpus:RDD[Array[String]] = sc.textFile("path/to/data")
.map(_.split('\t')).map(x => Array(x(0),x(2)))
And then I get the vocabulary data I'll need to generate the sparse vectors:
val vocab: RDD[String] = corpus.map(_(1)).distinct()
val vocabMap: Map[String, Int] = vocab.collect().zipWithIndex.toMap
What I don't know is the proper mapping function to use here such that I end up with a sparse term frequency vector for each document that I can then feed into the LDA model. I think I need something along these lines...
val documents: RDD[(Long, Vector)] = corpus.groupBy(_(0)).zipWithIndex
.map(x =>(x._2,Vectors.sparse(vocabMap.size, ???)))
At which point I can run the actual LDA:
val lda = new LDA().setK(n_topics)
val ldaModel = lda.run(documents)
Basically, I don't what function to apply to each group so that I can feed term frequency data (presumably as a map
?) into a sparse vector. In other words, how do I fill in the ???
in the code snippet above to achieve the desired effect?
One way to handle this:
- make sure that
spark-csv
package is available load data into DataFrame and select columns of interest
val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .option("inferSchema", "true") // Optional, providing schema is prefered .option("delimiter", "\t") .load("foo.csv") .select($"doc".cast("long").alias("doc"), $"term")
index
term
column:import org.apache.spark.ml.feature.StringIndexer val indexer = new StringIndexer() .setInputCol("term") .setOutputCol("termIndexed") val indexed = indexer.fit(df) .transform(df) .drop("term") .withColumn("termIndexed", $"termIndexed".cast("integer")) .groupBy($"doc", $"termIndexed") .agg(count(lit(1)).alias("cnt").cast("double"))
convert to
PairwiseRDD
import org.apache.spark.sql.Row val pairs = indexed.map{case Row(doc: Long, term: Int, cnt: Double) => (doc, (term, cnt))}
group by doc:
val docs = pairs.groupByKey
create feature vectors
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.sql.functions.max val n = indexed.select(max($"termIndexed")).first.getInt(0) + 1 val docsWithFeatures = docs.mapValues(vs => Vectors.sparse(n, vs.toSeq))
now you have all you need to create
LabeledPoints
or apply additional processing
这篇关于在 spark 中为 LDA 准备数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!