访问 Spark 2.0 中的向量列时出现 MatchError [英] MatchError while accessing vector column in Spark 2.0

查看:20
本文介绍了访问 Spark 2.0 中的向量列时出现 MatchError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在 JSON 文件上创建 LDA 模型.

I am trying to create a LDA model on a JSON file.

使用 JSON 文件创建 spark 上下文:

Creating a spark context with the JSON file :

import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder
  .master("local")
  .appName("my-spark-app")
  .config("spark.some.config.option", "config-value")
  .getOrCreate()

 val df = spark.read.json("dbfs:/mnt/JSON6/JSON/sampleDoc.txt")

显示 df 应该显示 DataFrame

display(df)

标记文本

import org.apache.spark.ml.feature.RegexTokenizer

// Set params for RegexTokenizer
val tokenizer = new RegexTokenizer()
                .setPattern("[\\W_]+")
                .setMinTokenLength(4) // Filter away tokens with length < 4
                .setInputCol("text")
                .setOutputCol("tokens")

// Tokenize document
val tokenized_df = tokenizer.transform(df)

这应该显示 tokenized_df

display(tokenized_df)

获取停用词

%sh wget http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words > -O /tmp/stopwords

可选:将停用词复制到 tmp 文件夹

Optional: copying the stopwords to the tmp folder

%fs cp file:/tmp/stopwords dbfs:/tmp/stopwords

收集所有的停用词

val stopwords = sc.textFile("/tmp/stopwords").collect()

过滤掉停用词

 import org.apache.spark.ml.feature.StopWordsRemover

 // Set params for StopWordsRemover
 val remover = new StopWordsRemover()
                   .setStopWords(stopwords) // This parameter is optional
                   .setInputCol("tokens")
                   .setOutputCol("filtered")

 // Create new DF with Stopwords removed
 val filtered_df = remover.transform(tokenized_df)

显示过滤后的 df 应该验证 stopwords 已被删除

Displaying the filtered df should verify the stopwords got removed

 display(filtered_df)

向量化单词出现的频率

 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.sql.Row
 import org.apache.spark.ml.feature.CountVectorizer

 // Set params for CountVectorizer
 val vectorizer = new CountVectorizer()
               .setInputCol("filtered")
               .setOutputCol("features")
               .fit(filtered_df)

验证vectorizer

 vectorizer.transform(filtered_df)
           .select("id", "text","features","filtered").show()

此后,我发现在 LDA 中安装此 vectorizer 时出现问题.我认为 CountVectorizer 的问题是给出稀疏向量,但 LDA 需要密集向量.仍在努力找出问题所在.

After this I am seeing an issue in fitting this vectorizer in LDA. The issue which I believe is CountVectorizer is giving sparse vector but LDA requires dense vector. Still trying to figure out the issue.

这是地图无法转换的例外情况.

Here is the exception where map is not able to convert.

import org.apache.spark.mllib.linalg.Vector
val ldaDF = countVectors.map { 
             case Row(id: String, countVector: Vector) => (id, countVector) 
            }
display(ldaDF)

异常:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4083.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4083.0 (TID 15331, 10.209.240.17): scala.MatchError: [0,(1252,[13,17,18,20,30,37,45,50,51,53,63,64,96,101,108,125,174,189,214,221,224,227,238,268,291,309,328,357,362,437,441,455,492,493,511,528,561,613,619,674,764,823,839,980,1098,1143],[1.0,1.0,2.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,2.0,1.0,5.0,1.0,2.0,2.0,1.0,4.0,1.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)

有一个 LDA 的工作示例,它不会引发任何问题

There is a working sample for LDA which is not throwing any issue

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}

val a = Vectors.dense(Array(1.0,2.0,3.0))
val b = Vectors.dense(Array(3.0,4.0,5.0))
val df = Seq((1L,a),(2L,b),(2L,a)).toDF

val ldaDF = df.map { case Row(id: Long, countVector: Vector) => (id, countVector) } 

val model = new LDA().setK(3).run(ldaDF.javaRDD)
display(df)

唯一的区别是我们有一个密集矩阵的第二个片段.

The only difference is in the second snippet we are having a dense matrix.

推荐答案

这与稀疏性无关.由于 Spark 2.0.0 ML Transformers 不再生成 oasmllib.linalg.VectorUDT 而是 oasml.linalg.VectorUDT 并在本地映射到子类oasml.linalg.Vector.这些与旧的 MLLib API 不兼容,后者在 Spark 2.0.0 中逐渐被弃用.

This has nothing to do with sparsity. Since Spark 2.0.0 ML Transformers no longer generate o.a.s.mllib.linalg.VectorUDT but o.a.s.ml.linalg.VectorUDT and are mapped locally to subclasses of o.a.s.ml.linalg.Vector. These are not compatible with old MLLib API which is moving towards deprecation in Spark 2.0.0.

您可以使用 Vectors.fromML 在旧"之间进行转换:

You can convert between to "old" using Vectors.fromML:

import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
import org.apache.spark.ml.linalg.{Vectors => NewVectors}

OldVectors.fromML(NewVectors.dense(1.0, 2.0, 3.0))
OldVectors.fromML(NewVectors.sparse(5, Seq(0 -> 1.0, 2 -> 2.0, 4 -> 3.0)))

但如果您已经使用了 ML 转换器,那么使用 LDA 的 ML 实现会更有意义.

but it make more sense to use ML implementation of LDA if you already use ML transformers.

为方便起见,您可以使用隐式转换:

For convenience you can use implicit conversions:

import scala.languageFeature.implicitConversions

object VectorConversions {
  import org.apache.spark.mllib.{linalg => mllib}
  import org.apache.spark.ml.{linalg => ml}

  implicit def toNewVector(v: mllib.Vector) = v.asML
  implicit def toOldVector(v: ml.Vector) = mllib.Vectors.fromML(v)
}

这篇关于访问 Spark 2.0 中的向量列时出现 MatchError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆