将稀疏特征向量分解为单独的列 [英] Explode sparse features vector into separate columns

查看:87
本文介绍了将稀疏特征向量分解为单独的列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的spark DataFrame中,我有一列,其中包含CountVectoriser转换的输出-它为稀疏矢量格式.我想做的是再次将此列分解"为密集向量,然后将其作为组件行(以便可以用于外部模型评分).

In my spark DataFrame I have a column which includes the output of a CountVectoriser transformation - it is in sparse vector format. What I am trying to do is to 'explode' this column again into a dense vector and then it's component rows (so that it can be used for scoring by an external model).

我知道该列中有40个功能,因此在

I know there are 40 features in the column, hence Following this example, I have tried:

import org.apache.spark.sql.functions.udf
import org.apache.spark.mllib.linalg.Vector

// convert sparse vector to a dense vector, and then to array<double> 
val vecToSeq = udf((v: Vector) => v.toArray)

// Prepare a list of columns to create
val exprs = (0 until 39).map(i => $"_tmp".getItem(i).alias(s"exploded_col$i"))
testDF.select(vecToSeq($"features").alias("_tmp")).select(exprs:_*)

但是,我得到了奇怪的错误(请参阅下面的完整错误):

However, I get the weird error (see full error below):

data type mismatch: argument 1 requires vector type, however, 'features' is of vector type.;

现在看来,也许CountVectoriser创建了一个类型为'ml.linalg.Vector'的向量,所以我也可以尝试导入:

Now it appears that maybe the CountVectoriser created a vector of type 'ml.linalg.Vector,' so I have alternatively tried importing:

import org.apache.spark.ml.linalg.{Vector, DenseVector, SparseVector}

然后我得到一个错误原因:

And then I get an error Caused by:

Caused by: java.lang.ClassCastException: org.apache.spark.ml.linalg.SparseVector cannot be cast to org.apache.spark.sql.Row

我还尝试通过将UDF更改为来转换ml向量:

I have also tried converting the ml vector by altering the UDF to:

val vecToSeq = udf((v: Vector) =>  org.apache.spark.mllib.linalg.Vectors.fromML(v.toDense).toArray )

并得到类似的cannot be cast to org.apache.spark.sql.Row错误.谁能告诉我为什么这不起作用?有没有更简单的方法可以将DataFrame中的稀疏向量分解为单独的列?我已经花了几个小时,无法解决.

And get a similar cannot be cast to org.apache.spark.sql.Row error. Can anyone tell me why this is not working? Is there an easier way to explode a sparse vector in a DataFrame into sperate columns? I've spent hours on this and cannot figure it out.

该架构将要素列显示为矢量:

The schema shows the feature column just as a vector:

  |-- features: vector (nullable = true)

完整错误跟踪:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(features)' due to data type mismatch: argument 1 requires vector type, however, 'features' is of vector type.;;
Project [UDF(features#325) AS _tmp#463]
. . . 
org.apache.spark.sql.cassandra.CassandraSourceRelation@47eae91d

        at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2872)
        at org.apache.spark.sql.Dataset.select(Dataset.scala:1153)
        at uk.nominet.renewals.prediction_test$.prediction_test(prediction_test.scala:292)
        at 

推荐答案

这似乎与您的导入语句有关.如您所见,CountVectorizer将使用ml程序包向量,因此,所有向量导入也应使用此程序包.确保您没有使用较早的mllib进行任何导入.这包括:

It appears to be an issue with your import statements. As you noticed, CountVectorizer will use the ml package vectors, therefore, all vector imports should also use this package. Make sure you do not have any imports using the older mllib. This include:

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.linalg.DenseVector

mllib软件包中仅存在一些方法,因此,在实际需要使用这种类型的向量的情况下,可以对其进行重命名(因为名称与ml向量相同).例如:

There are some methods only present in the mllib package, so in the case you actually need to use this type of vectors, you can rename them (since the name is the same as the ml vectors otherwise). For example:

import org.apache.spark.mllib.linalg.{Vector => mllibVector}

修复所有导入后,您的代码应运行.测试:

After fixing all imports, your code should run. Test:

val df = Seq((1L, Seq("word1", "word2", "word3")), (2L, Seq("word2", "word4"))).toDF("id", "words")
val countVec = new CountVectorizer().setInputCol("words").setOutputCol("features")
val testDF = countVec.fit(df).transform(df)

将给出如下测试数据框:

Will give a testing dataframe as follows:

+---+--------------------+--------------------+
| id|               words|            features|
+---+--------------------+--------------------+
|  1|[word1, word2, wo...|(4,[0,2,3],[1.0,1...|
|  2|      [word2, word4]| (4,[0,1],[1.0,1.0])|
+---+--------------------+--------------------+

现在为每个索引分配自己的列:

Now to give each index it's own column:

val vecToSeq = udf((v: Vector) => v.toArray)

val exprs = (0 until 4).map(i => $"features".getItem(i).alias(s"exploded_col$i"))
val df2 = testDF.withColumn("features", vecToSeq($"features")).select(exprs:_*)

产生的数据声望:

+-------------+-------------+-------------+-------------+
|exploded_col0|exploded_col1|exploded_col2|exploded_col3|
+-------------+-------------+-------------+-------------+
|          1.0|          0.0|          1.0|          1.0|
|          1.0|          1.0|          0.0|          0.0|
+-------------+-------------+-------------+-------------+

这篇关于将稀疏特征向量分解为单独的列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆