在 spark 中为 LDA 准备数据 [英] Preparing data for LDA in spark

查看:20
本文介绍了在 spark 中为 LDA 准备数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在努力实现 Spark LDA 模型(通过 Scala API),但在为我的数据执行必要的格式化步骤时遇到了问题.我的原始数据(存储在文本文件中)采用以下格式,本质上是令牌列表及其对应的文档.一个简化的例子:

doc XXXXX 术语 XXXXX1 x 'a' x1 x 'a' x1 x 'b' x2 x 'b' x2 x 'd' x...

XXXXX 列是我不关心的垃圾数据.我意识到这是存储语料库数据的一种非典型方式,但这就是我所拥有的.正如我希望从示例中清楚地看到的那样,原始数据中每个 token 有一行(因此,如果给定的术语在文档中出现 5 次,则对应于 5 行文本).>

无论如何,我需要将这些数据格式化为稀疏词频向量以运行 Spark LDA 模型,但我不熟悉 Scala,因此遇到了一些麻烦.

我开始于:

import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel}导入 org.apache.spark.mllib.linalg.{Vector, Vectors}导入 org.apache.spark.rdd.RDDval 语料库:RDD[Array[String]] = sc.textFile("path/to/data").map(_.split('\t')).map(x => Array(x(0),x(2)))

然后我得到生成稀疏向量所需的词汇数据:

val vocab: RDD[String] = corpus.map(_(1)).distinct()val vocabMap: Map[String, Int] = vocab.collect().zipWithIndex.toMap

我不知道这里使用的正确映射函数,这样我最终会得到每个文档的稀疏词频向量,然后我可以将其输入 LDA 模型.我想我需要一些类似的东西...

val 文档:RDD[(Long, Vector)] = corpus.groupBy(_(0)).zipWithIndex.map(x =>(x._2,Vectors.sparse(vocabMap.size, ???)))

此时我可以运行实际的 LDA:

val lda = new LDA().setK(n_topics)val ldaModel = lda.run(文件)

基本上,我不知道将什么函数应用于每个组,以便我可以将词频数据(大概作为 map?)输入稀疏向量.换句话说,如何填写上面代码片段中的???才能达到想要的效果?

解决方案

一种处理方法:

  • 确保 spark-csv 包可用
  • 将数据加载到 DataFrame 并选择感兴趣的列

    val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true")//可选,优先提供模式.option("分隔符", "\t").load("foo.csv").select($"doc".cast("long").alias("doc"), $"term")

  • index term 列:

    import org.apache.spark.ml.feature.StringIndexerval indexer = new StringIndexer().setInputCol("术语").setOutputCol("termIndexed")val 索引 = indexer.fit(df).transform(df).drop("术语").withColumn("termIndexed", $"termIndexed".cast("integer")).groupBy($"doc", $"termIndexed").agg(count(lit(1)).alias("cnt").cast("double"))

  • 转换为PairwiseRDD

    import org.apache.spark.sql.Rowval 对 = indexed.map{case Row(doc: Long, term: Int, cnt: Double) =>(doc, (term, cnt))}

  • 按文档分组:

    val docs =pairs.groupByKey

  • 创建特征向量

    import org.apache.spark.mllib.linalg.Vectors导入 org.apache.spark.sql.functions.maxval n = indexed.select(max($"termIndexed")).first.getInt(0) + 1val docsWithFeatures = docs.mapValues(vs => Vectors.sparse(n, vs.toSeq))

  • 现在您拥有了创建 LabeledPoints 或应用额外处理所需的一切

I'm working on implementing a Spark LDA model (via the Scala API), and am having trouble with the necessary formatting steps for my data. My raw data (stored in a text file) is in the following format, essentially a list of tokens and the documents they correspond to. A simplified example:

doc XXXXX   term    XXXXX
1   x       'a'     x
1   x       'a'     x
1   x       'b'     x
2   x       'b'     x
2   x       'd'     x
...

Where the XXXXX columns are garbage data I don't care about. I realize this is an atypical way of storing corpus data, but it's what I have. As is I hope is clear from the example, there's one line per token in the raw data (so if a given term appears 5 times in a document, that corresponds to 5 lines of text).

In any case, I need to format this data as sparse term-frequency vectors for running a Spark LDA model, but am unfamiliar with Scala so having some trouble.

I start with:

import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD

val corpus:RDD[Array[String]] = sc.textFile("path/to/data")
    .map(_.split('\t')).map(x => Array(x(0),x(2)))

And then I get the vocabulary data I'll need to generate the sparse vectors:

val vocab: RDD[String] = corpus.map(_(1)).distinct()
val vocabMap: Map[String, Int] = vocab.collect().zipWithIndex.toMap

What I don't know is the proper mapping function to use here such that I end up with a sparse term frequency vector for each document that I can then feed into the LDA model. I think I need something along these lines...

val documents: RDD[(Long, Vector)] = corpus.groupBy(_(0)).zipWithIndex
    .map(x =>(x._2,Vectors.sparse(vocabMap.size, ???)))

At which point I can run the actual LDA:

val lda = new LDA().setK(n_topics)
val ldaModel = lda.run(documents)

Basically, I don't what function to apply to each group so that I can feed term frequency data (presumably as a map?) into a sparse vector. In other words, how do I fill in the ??? in the code snippet above to achieve the desired effect?

解决方案

One way to handle this:

  • make sure that spark-csv package is available
  • load data into DataFrame and select columns of interest

    val df = sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .option("inferSchema", "true") // Optional, providing schema is prefered
        .option("delimiter", "\t")
        .load("foo.csv")
        .select($"doc".cast("long").alias("doc"), $"term")
    

  • index term column:

    import org.apache.spark.ml.feature.StringIndexer
    
    val indexer = new StringIndexer()
      .setInputCol("term")
      .setOutputCol("termIndexed")
    
    val indexed = indexer.fit(df)
      .transform(df)
      .drop("term")
      .withColumn("termIndexed", $"termIndexed".cast("integer"))
      .groupBy($"doc", $"termIndexed")
      .agg(count(lit(1)).alias("cnt").cast("double"))
    

  • convert to PairwiseRDD

    import org.apache.spark.sql.Row
    
    val pairs = indexed.map{case Row(doc: Long, term: Int, cnt: Double) => 
      (doc, (term, cnt))}
    

  • group by doc:

    val docs = pairs.groupByKey
    

  • create feature vectors

    import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.sql.functions.max
    
    val n = indexed.select(max($"termIndexed")).first.getInt(0) + 1
    
    val docsWithFeatures = docs.mapValues(vs => Vectors.sparse(n, vs.toSeq))
    

  • now you have all you need to create LabeledPoints or apply additional processing

这篇关于在 spark 中为 LDA 准备数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆