在Spark 2.0中访问向量列时出现MatchError [英] MatchError while accessing vector column in Spark 2.0

查看:55
本文介绍了在Spark 2.0中访问向量列时出现MatchError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在JSON文件上创建LDA模型.

I am trying to create a LDA model on a JSON file.

使用JSON文件创建spark上下文:

Creating a spark context with the JSON file :

import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder
  .master("local")
  .appName("my-spark-app")
  .config("spark.some.config.option", "config-value")
  .getOrCreate()

 val df = spark.read.json("dbfs:/mnt/JSON6/JSON/sampleDoc.txt")

显示df应该显示DataFrame

display(df)

标记文本

import org.apache.spark.ml.feature.RegexTokenizer

// Set params for RegexTokenizer
val tokenizer = new RegexTokenizer()
                .setPattern("[\\W_]+")
                .setMinTokenLength(4) // Filter away tokens with length < 4
                .setInputCol("text")
                .setOutputCol("tokens")

// Tokenize document
val tokenized_df = tokenizer.transform(df)

这应该显示tokenized_df

display(tokenized_df)

获取stopwords

%sh wget http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words > -O /tmp/stopwords

可选:将停用词复制到tmp文件夹

Optional: copying the stopwords to the tmp folder

%fs cp file:/tmp/stopwords dbfs:/tmp/stopwords

收集所有stopwords

val stopwords = sc.textFile("/tmp/stopwords").collect()

过滤出stopwords

 import org.apache.spark.ml.feature.StopWordsRemover

 // Set params for StopWordsRemover
 val remover = new StopWordsRemover()
                   .setStopWords(stopwords) // This parameter is optional
                   .setInputCol("tokens")
                   .setOutputCol("filtered")

 // Create new DF with Stopwords removed
 val filtered_df = remover.transform(tokenized_df)

显示已过滤的df应该验证stopwords是否已删除

Displaying the filtered df should verify the stopwords got removed

 display(filtered_df)

向量化单词出现的频率

 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.sql.Row
 import org.apache.spark.ml.feature.CountVectorizer

 // Set params for CountVectorizer
 val vectorizer = new CountVectorizer()
               .setInputCol("filtered")
               .setOutputCol("features")
               .fit(filtered_df)

验证vectorizer

 vectorizer.transform(filtered_df)
           .select("id", "text","features","filtered").show()

在此之后,我看到了在LDA中安装此vectorizer的问题.我认为是CountVectorizer的问题正在提供稀疏向量,但LDA需要密集向量.仍在尝试找出问题所在.

After this I am seeing an issue in fitting this vectorizer in LDA. The issue which I believe is CountVectorizer is giving sparse vector but LDA requires dense vector. Still trying to figure out the issue.

这是地图无法转换的例外情况.

Here is the exception where map is not able to convert.

import org.apache.spark.mllib.linalg.Vector
val ldaDF = countVectors.map { 
             case Row(id: String, countVector: Vector) => (id, countVector) 
            }
display(ldaDF)

例外:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4083.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4083.0 (TID 15331, 10.209.240.17): scala.MatchError: [0,(1252,[13,17,18,20,30,37,45,50,51,53,63,64,96,101,108,125,174,189,214,221,224,227,238,268,291,309,328,357,362,437,441,455,492,493,511,528,561,613,619,674,764,823,839,980,1098,1143],[1.0,1.0,2.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,2.0,1.0,5.0,1.0,2.0,2.0,1.0,4.0,1.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)

LDA有一个有效的示例,不会引发任何问题

There is a working sample for LDA which is not throwing any issue

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}

val a = Vectors.dense(Array(1.0,2.0,3.0))
val b = Vectors.dense(Array(3.0,4.0,5.0))
val df = Seq((1L,a),(2L,b),(2L,a)).toDF

val ldaDF = df.map { case Row(id: Long, countVector: Vector) => (id, countVector) } 

val model = new LDA().setK(3).run(ldaDF.javaRDD)
display(df)

唯一的区别是在第二个片段中,我们有一个密集矩阵.

The only difference is in the second snippet we are having a dense matrix.

推荐答案

这与稀疏无关.从Spark 2.0.0 ML Transformers开始不再生成o.a.s.mllib.linalg.VectorUDT,而是生成o.a.s.ml.linalg.VectorUDT,并将其本地映射到o.a.s.ml.linalg.Vector的子类.这些与旧的MLLib API不兼容,而旧的MLLib API正在Spark 2.0.0中弃用.

This has nothing to do with sparsity. Since Spark 2.0.0 ML Transformers no longer generate o.a.s.mllib.linalg.VectorUDT but o.a.s.ml.linalg.VectorUDT and are mapped locally to subclasses of o.a.s.ml.linalg.Vector. These are not compatible with old MLLib API which is moving towards deprecation in Spark 2.0.0.

您可以使用Vectors.fromML转换为旧":

You can convert between to "old" using Vectors.fromML:

import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
import org.apache.spark.ml.linalg.{Vectors => NewVectors}

OldVectors.fromML(NewVectors.dense(1.0, 2.0, 3.0))
OldVectors.fromML(NewVectors.sparse(5, Seq(0 -> 1.0, 2 -> 2.0, 4 -> 3.0)))

,但如果您已经使用ML转换器,则使用LDA的ML实现更有意义.

but it make more sense to use ML implementation of LDA if you already use ML transformers.

为方便起见,您可以使用隐式转换:

For convenience you can use implicit conversions:

import scala.languageFeature.implicitConversions

object VectorConversions {
  import org.apache.spark.mllib.{linalg => mllib}
  import org.apache.spark.ml.{linalg => ml}

  implicit def toNewVector(v: mllib.Vector) = v.asML
  implicit def toOldVector(v: ml.Vector) = mllib.Vectors.fromML(v)
}

这篇关于在Spark 2.0中访问向量列时出现MatchError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆