如何在pyspark中将密集向量的RDD转换为DataFrame? [英] How to convert RDD of dense vector into DataFrame in pyspark?

查看:35
本文介绍了如何在pyspark中将密集向量的RDD转换为DataFrame?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 DenseVector RDD 像这样

>>> frequencyDenseVectors.collect()
[DenseVector([1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0]), DenseVector([1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0])]

我想将其转换为 Dataframe.我是这样试的

I want to convert this into a Dataframe. I tried like this

>>> spark.createDataFrame(frequencyDenseVectors, ['rawfeatures']).collect()

它给出了这样的错误

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 520, in createDataFrame
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 360, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 340, in _inferSchema
    schema = _infer_schema(first)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 991, in _infer_schema
    fields = [StructField(k, _infer_type(v), True) for k, v in items]
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 968, in _infer_type
    raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'numpy.ndarray'>

旧解决方案

frequencyVectors.map(lambda vector: DenseVector(vector.toArray()))

编辑 1 - 代码可重现

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import split

from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.mllib.linalg import SparseVector, DenseVector

sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
sc.setLogLevel('ERROR')

sentenceData = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (0, "I wish Java could use case classes"),
    (1, "Logistic regression models are neat")
], ["label", "sentence"])
sentenceData = sentenceData.withColumn("sentence", split("sentence", "\s+"))
sentenceData.show()

vectorizer = CountVectorizer(inputCol="sentence", outputCol="rawfeatures").fit(sentenceData)
countVectors = vectorizer.transform(sentenceData).select("label", "rawfeatures")

idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(countVectors)
tfidf = idfModel.transform(countVectors).select("label", "features")
frequencyDenseVectors = tfidf.rdd.map(lambda vector: [vector[0],DenseVector(vector[1].toArray())])
frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])

推荐答案

不能直接转换RDD[Vector].它应该映射到一个 RDD 对象,这些对象可以被解释为 structs,例如 RDD[Tuple[Vector]]:

You cannot convert RDD[Vector] directly. It should be mapped to a RDD of objects which can be interpreted as structs, for example RDD[Tuple[Vector]]:

frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])

否则 Spark 将尝试转换对象 __dict__ 并创建使用不受支持的 NumPy 数组作为字段.

Otherwise Spark will try to convert object __dict__ and create use unsupported NumPy array as a field.

from pyspark.ml.linalg import DenseVector  
from pyspark.sql.types import _infer_schema

v = DenseVector([1, 2, 3])
_infer_schema(v)

TypeError                                 Traceback (most recent call last)
... 
TypeError: not supported type: <class 'numpy.ndarray'>

对比

_infer_schema((v, ))

StructType(List(StructField(_1,VectorUDT,true)))

注意事项:

  • 在 Spark 2.0 中,您必须使用正确的本地类型:

  • In Spark 2.0 you have to use correct local types:

  • pyspark.ml.linalg 在使用基于 DataFramepyspark.ml API 时.
  • pyspark.mllib.linalg 在使用基于 RDDpyspark.mllib API 时.
  • pyspark.ml.linalg when working DataFrame based pyspark.ml API.
  • pyspark.mllib.linalg when working RDD based pyspark.mllib API.

这两个命名空间不再兼容,需要显式转换(例如如何从org.apache.spark.mllib.linalg转换.VectorUDT 到 ml.linalg.VectorUDT).

These two namespaces can no longer compatible and require explicit conversions (for example How to convert from org.apache.spark.mllib.linalg.VectorUDT to ml.linalg.VectorUDT).

编辑中提供的代码与原始问题中的代码不同.您应该知道 tuplelist 没有相同的语义.如果您将向量映射为配对使用 tuple 并直接转换为 DataFrame:

Code provided in the edit is not equivalent to the one from the original question. You should be aware that tuple and list don't have the same semantics. If you map vector to pair use tuple and convert directly to DataFrame:

tfidf.rdd.map(
    lambda row: (row[0], DenseVector(row[1].toArray()))
).toDF()

使用 tuple(产品类型)也适用于嵌套结构,但我怀疑这是您想要的:

using tuple (product type) would work for nested structure as well but I doubt this is what you want:

(tfidf.rdd
    .map(lambda row: (row[0], DenseVector(row[1].toArray())))
    .map(lambda x: (x, ))
    .toDF())

list 在除顶层 row 之外的任何其他地方都被解释为 ArrayType.

list at any other place than the top level row is interpreted as an ArrayType.

使用 UDF 进行转换要干净得多 (Spark Python:标准缩放器错误不支持 ...SparseVector").

It is much cleaner to use an UDF for conversion (Spark Python: Standard scaler error "Do not support ... SparseVector").

这篇关于如何在pyspark中将密集向量的RDD转换为DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆