Pyspark更新特征向量中的值 [英] Pyspark update the value in feature vector

查看:143
本文介绍了Pyspark更新特征向量中的值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在构建文本分类器,并使用spark countVectorizer创建特征向量.

I am building text classifier and am using spark countVectorizer to create feature vector.

现在要将此向量与BIDGL库一起使用,我需要将特征向量中的所有0转换为1.

Now to use this Vector with BIDGL library i need to convert all 0 in the feature vector to 1.

这是我的特征向量,它是一个稀疏向量:

Here is my feature vector which is a sparse vector :

vectorizer_df.select('features').show(2)
+--------------------+
|            features|
+--------------------+
|(1000,[4,6,11,13,...|
|(1000,[0,1,2,3,4,...|
+--------------------+
only showing top 2 rows

我正在尝试如下更新值.首先将稀疏向量转换为密集向量

I am trying to update the value as below. First converting the sparse vector to dense vector

from pyspark.mllib.linalg import Vectors, VectorUDT
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import udf

update_vector = udf(lambda vector: Vectors.dense(vector), VectorUDT())


df = vectorizer_df.withColumn('features',update_vector(vectorizer_df.features))

df.select('features').show(2)
+--------------------+
|            features|
+--------------------+
|[0.0,0.0,0.0,0.0,...|
|[5571.0,4688.0,24...|
+--------------------+
only showing top 2 rows

一旦我有了密集的向量,我就会尝试将1加到所有元素上

Once i have the dense vector, i am trying to add 1 to all the elements

def add1(x):
    return x+1
def array_for(x):
    return np.array([add1(xi) for xi in x])

add_udf_one = udf(lambda z: array_for(z), VectorUDT())

df = df.select('features', add_udf_one('features').alias('feature_1'))

df.select('feature_1').show(2)

但是现在我收到一个TypeError,如下所示:

But now i get an TypeError as below:

TypeError: cannot serialize array([  ....]) of type <class 'numpy.ndarray'>

完全错误如下

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-98-5aa5196824cf> in <module>
----> 1 df.select('feature_1').show(2)

/usr/local/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    348         """
    349         if isinstance(truncate, bool) and truncate:
--> 350             print(self._jdf.showString(n, 20, vertical))
    351         else:
    352             print(self._jdf.showString(n, int(truncate), vertical))

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o1192.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 63.0 failed 1 times, most recent failure: Lost task 0.0 in stage 63.0 (TID 4886, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 324, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in dump_stream
    for obj in iterator:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 313, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 73, in <lambda>
    return lambda *a: toInternal(f(*a))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 682, in toInternal
    return self._cachedSqlType().toInternal(self.serialize(obj))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 169, in serialize
    raise TypeError("cannot serialize %r of type %r" % (obj, type(obj)))
TypeError: cannot serialize array([  1.,   1.,   1.,   1.,   2.,   1., 326.,   1.,   1.,   1.,   1.,
         2.,   1.,   3.,   1.,   1.,   1.,   1., 383.,   1., 312.,   1.,
         1.,   1.,   1.,   1.,   1.,  39.,   1.,   1.,   1.,   1.,   1.,
       180.,   1.,   1.,   1., 167.,   4.,   1.,   1.,   1.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1., 133.,   1.,
         1.,   1., 123.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,  96.,   1.,   7.,
         7.,   7.,   7.,   7.,   7.,   7.,   1.,   1.,  13.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,
         1.,   4.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.]) of type <class 'numpy.ndarray'>

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:66)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

关于如何更新pyspark特征向量的任何建议>

Any suggestions on how I can update the pyspark feature vector >

谢谢

推荐答案

您已经差不多了,但是Spark不支持NumPy类型,并且VectorUDT还是不匹配.

You are almost there, but Spark doesn't support NumPy types, and VectorUDT wouldn't match one anyway.

相反

import numpy as np
from pyspark.sql.functions import udf
from pyspark.ml.linalg import DenseVector, SparseVector, Vectors, VectorUDT

@udf(VectorUDT())
def zeros_to_ones(v):
    if v is None:
        return v
    # Sparse vector will become dense
    if isinstance(v, SparseVector):
        v = v.toArray()
        return DenseVector(np.where(v == 0, 1, v))
    if isinstance(v, DenseVector):
        return DenseVector(np.where(v.array == 0, 1, v.array))

用法:

df = spark.createDataFrame(
    [(1, Vectors.dense([0, 1, 0, 3])), (2, Vectors.sparse(4, [0, 3], [0, 1]))], 
    ("id", "features")
)

df.withColumn("features_no_zeros", zeros_to_ones("features")).show(truncate=False)

+---+-------------------+-----------------+                                     
|id |features           |features_no_zeros|
+---+-------------------+-----------------+
|1  |[0.0,1.0,0.0,3.0]  |[1.0,1.0,1.0,3.0]|
|2  |(4,[0,3],[0.0,1.0])|[1.0,1.0,1.0,1.0]|
+---+-------------------+-----------------+

这篇关于Pyspark更新特征向量中的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆