将稀疏特征向量分解为单独的列 [英] Explode sparse features vector into separate columns

查看:25
本文介绍了将稀疏特征向量分解为单独的列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的 spark DataFrame 中,有一列包含 CountVectoriser 转换的输出 - 它采用稀疏矢量格式.我想要做的是再次将这一列分解"成一个密集的向量,然后它是组成行(这样它就可以被外部模型用于评分).

In my spark DataFrame I have a column which includes the output of a CountVectoriser transformation - it is in sparse vector format. What I am trying to do is to 'explode' this column again into a dense vector and then it's component rows (so that it can be used for scoring by an external model).

我知道该列中有 40 个功能,因此遵循 这个例子,我试过:

I know there are 40 features in the column, hence Following this example, I have tried:

import org.apache.spark.sql.functions.udf
import org.apache.spark.mllib.linalg.Vector

// convert sparse vector to a dense vector, and then to array<double> 
val vecToSeq = udf((v: Vector) => v.toArray)

// Prepare a list of columns to create
val exprs = (0 until 39).map(i => $"_tmp".getItem(i).alias(s"exploded_col$i"))
testDF.select(vecToSeq($"features").alias("_tmp")).select(exprs:_*)

但是,我收到了奇怪的错误(请参阅下面的完整错误):

However, I get the weird error (see full error below):

数据类型不匹配:参数 1 需要向量类型,但是,'features' 是向量类型.;

现在看来 CountVectoriser 可能创建了一个类型为ml.linalg.Vector"的向量,所以我也尝试导入:

Now it appears that maybe the CountVectoriser created a vector of type 'ml.linalg.Vector,' so I have alternatively tried importing:

import org.apache.spark.ml.linalg.{Vector, DenseVector, SparseVector}

然后我得到一个错误引起的:

And then I get an error Caused by:

Caused by: java.lang.ClassCastException: org.apache.spark.ml.linalg.SparseVector cannot be cast to org.apache.spark.sql.Row

我还尝试通过将 UDF 更改为以下内容来转换 ml 向量:

I have also tried converting the ml vector by altering the UDF to:

val vecToSeq = udf((v: Vector) =>  org.apache.spark.mllib.linalg.Vectors.fromML(v.toDense).toArray )

并得到类似的cannot be cast to org.apache.spark.sql.Row 错误.谁能告诉我为什么这不起作用?有没有更简单的方法将 DataFrame 中的稀疏向量分解为单独的列?我花了几个小时来解决这个问题,但无法弄清楚.

And get a similar cannot be cast to org.apache.spark.sql.Row error. Can anyone tell me why this is not working? Is there an easier way to explode a sparse vector in a DataFrame into sperate columns? I've spent hours on this and cannot figure it out.

模式将特征列显示为向量:

The schema shows the feature column just as a vector:

  |-- features: vector (nullable = true)

完整的错误跟踪:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(features)' due to data type mismatch: argument 1 requires vector type, however, 'features' is of vector type.;;
Project [UDF(features#325) AS _tmp#463]
. . . 
org.apache.spark.sql.cassandra.CassandraSourceRelation@47eae91d

        at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2872)
        at org.apache.spark.sql.Dataset.select(Dataset.scala:1153)
        at uk.nominet.renewals.prediction_test$.prediction_test(prediction_test.scala:292)
        at 

推荐答案

您的导入语句似乎存在问题.如您所见,CountVectorizer 将使用 ml 包向量,因此,所有向量导入也应使用此包.确保您没有使用旧的 mllib 进行任何导入.这包括:

It appears to be an issue with your import statements. As you noticed, CountVectorizer will use the ml package vectors, therefore, all vector imports should also use this package. Make sure you do not have any imports using the older mllib. This include:

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.linalg.DenseVector

有些方法只存在于 mllib 包中,所以如果你真的需要使用这种类型的向量,你可以重命名它们(因为名称与 ml 向量).例如:

There are some methods only present in the mllib package, so in the case you actually need to use this type of vectors, you can rename them (since the name is the same as the ml vectors otherwise). For example:

import org.apache.spark.mllib.linalg.{Vector => mllibVector}

修复所有导入后,您的代码应该会运行.测试:

After fixing all imports, your code should run. Test:

val df = Seq((1L, Seq("word1", "word2", "word3")), (2L, Seq("word2", "word4"))).toDF("id", "words")
val countVec = new CountVectorizer().setInputCol("words").setOutputCol("features")
val testDF = countVec.fit(df).transform(df)

将给出如下的测试数据框:

Will give a testing dataframe as follows:

+---+--------------------+--------------------+
| id|               words|            features|
+---+--------------------+--------------------+
|  1|[word1, word2, wo...|(4,[0,2,3],[1.0,1...|
|  2|      [word2, word4]| (4,[0,1],[1.0,1.0])|
+---+--------------------+--------------------+

现在给每个索引它自己的列:

Now to give each index it's own column:

val vecToSeq = udf((v: Vector) => v.toArray)

val exprs = (0 until 4).map(i => $"features".getItem(i).alias(s"exploded_col$i"))
val df2 = testDF.withColumn("features", vecToSeq($"features")).select(exprs:_*)

结果数据Ffame:

+-------------+-------------+-------------+-------------+
|exploded_col0|exploded_col1|exploded_col2|exploded_col3|
+-------------+-------------+-------------+-------------+
|          1.0|          0.0|          1.0|          1.0|
|          1.0|          1.0|          0.0|          0.0|
+-------------+-------------+-------------+-------------+

这篇关于将稀疏特征向量分解为单独的列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆