Pyspark更新特征向量中的值 [英] Pyspark update the value in feature vector
问题描述
我正在构建文本分类器,并使用spark countVectorizer创建特征向量.
I am building text classifier and am using spark countVectorizer to create feature vector.
现在要将此向量与BIDGL库一起使用,我需要将特征向量中的所有0转换为1.
Now to use this Vector with BIDGL library i need to convert all 0 in the feature vector to 1.
这是我的特征向量,它是一个稀疏向量:
Here is my feature vector which is a sparse vector :
vectorizer_df.select('features').show(2)
+--------------------+
| features|
+--------------------+
|(1000,[4,6,11,13,...|
|(1000,[0,1,2,3,4,...|
+--------------------+
only showing top 2 rows
我正在尝试如下更新值.首先将稀疏向量转换为密集向量
I am trying to update the value as below. First converting the sparse vector to dense vector
from pyspark.mllib.linalg import Vectors, VectorUDT
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import udf
update_vector = udf(lambda vector: Vectors.dense(vector), VectorUDT())
df = vectorizer_df.withColumn('features',update_vector(vectorizer_df.features))
df.select('features').show(2)
+--------------------+
| features|
+--------------------+
|[0.0,0.0,0.0,0.0,...|
|[5571.0,4688.0,24...|
+--------------------+
only showing top 2 rows
一旦我有了密集的向量,我就会尝试将1加到所有元素上
Once i have the dense vector, i am trying to add 1 to all the elements
def add1(x):
return x+1
def array_for(x):
return np.array([add1(xi) for xi in x])
add_udf_one = udf(lambda z: array_for(z), VectorUDT())
df = df.select('features', add_udf_one('features').alias('feature_1'))
df.select('feature_1').show(2)
但是现在我收到一个TypeError,如下所示:
But now i get an TypeError as below:
TypeError: cannot serialize array([ ....]) of type <class 'numpy.ndarray'>
完全错误如下
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-98-5aa5196824cf> in <module>
----> 1 df.select('feature_1').show(2)
/usr/local/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
348 """
349 if isinstance(truncate, bool) and truncate:
--> 350 print(self._jdf.showString(n, 20, vertical))
351 else:
352 print(self._jdf.showString(n, int(truncate), vertical))
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o1192.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 63.0 failed 1 times, most recent failure: Lost task 0.0 in stage 63.0 (TID 4886, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 324, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in dump_stream
for obj in iterator:
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 313, in _batched
for item in iterator:
File "<string>", line 1, in <lambda>
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 73, in <lambda>
return lambda *a: toInternal(f(*a))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 682, in toInternal
return self._cachedSqlType().toInternal(self.serialize(obj))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 169, in serialize
raise TypeError("cannot serialize %r of type %r" % (obj, type(obj)))
TypeError: cannot serialize array([ 1., 1., 1., 1., 2., 1., 326., 1., 1., 1., 1.,
2., 1., 3., 1., 1., 1., 1., 383., 1., 312., 1.,
1., 1., 1., 1., 1., 39., 1., 1., 1., 1., 1.,
180., 1., 1., 1., 167., 4., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 133., 1.,
1., 1., 123., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 96., 1., 7.,
7., 7., 7., 7., 7., 7., 1., 1., 13., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 4., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]) of type <class 'numpy.ndarray'>
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:66)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
关于如何更新pyspark特征向量的任何建议>
Any suggestions on how I can update the pyspark feature vector >
谢谢
推荐答案
您已经差不多了,但是Spark不支持NumPy类型,并且VectorUDT
还是不匹配.
You are almost there, but Spark doesn't support NumPy types, and VectorUDT
wouldn't match one anyway.
相反
import numpy as np
from pyspark.sql.functions import udf
from pyspark.ml.linalg import DenseVector, SparseVector, Vectors, VectorUDT
@udf(VectorUDT())
def zeros_to_ones(v):
if v is None:
return v
# Sparse vector will become dense
if isinstance(v, SparseVector):
v = v.toArray()
return DenseVector(np.where(v == 0, 1, v))
if isinstance(v, DenseVector):
return DenseVector(np.where(v.array == 0, 1, v.array))
用法:
df = spark.createDataFrame(
[(1, Vectors.dense([0, 1, 0, 3])), (2, Vectors.sparse(4, [0, 3], [0, 1]))],
("id", "features")
)
df.withColumn("features_no_zeros", zeros_to_ones("features")).show(truncate=False)
+---+-------------------+-----------------+
|id |features |features_no_zeros|
+---+-------------------+-----------------+
|1 |[0.0,1.0,0.0,3.0] |[1.0,1.0,1.0,3.0]|
|2 |(4,[0,3],[0.0,1.0])|[1.0,1.0,1.0,1.0]|
+---+-------------------+-----------------+
这篇关于Pyspark更新特征向量中的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!