PySpark DataFrame 中向量列的 UDF 问题 [英] Issue with UDF on a column of Vectors in PySpark DataFrame

查看:29
本文介绍了PySpark DataFrame 中向量列的 UDF 问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 PySpark 中的 Vectors 列上使用 UDF 时遇到问题,可在此处说明:

I am having trouble using a UDF on a column of Vectors in PySpark which can be illustrated here:

from pyspark import SparkContext
from pyspark.sql import Row
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from pyspark.mllib.linalg import Vectors

FeatureRow = Row('id', 'features')
data = sc.parallelize([(0, Vectors.dense([9.7, 1.0, -3.2])),
                       (1, Vectors.dense([2.25, -11.1, 123.2])),
                       (2, Vectors.dense([-7.2, 1.0, -3.2]))])
df = data.map(lambda r: FeatureRow(*r)).toDF()

vector_udf = udf(lambda vector: sum(vector), DoubleType())

df.withColumn('feature_sums', vector_udf(df.features)).first()

此操作失败并显示以下堆栈跟踪:

This fails with the following stack trace:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 31.0 failed 1 times, most recent failure: Lost task 5.0 in stage 31.0 (TID 95, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
x1  File "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/colin/src/spark/python/pyspark/sql/functions.py", line 469, in <lambda>
    func = lambda _, it: map(lambda x: f(*x), it)
  File "/Users/colin/pokitdok/spark_mapper/spark_mapper/filters.py", line 143, in <lambda>
TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'

看看传递给 UDF 的内容,似乎有些奇怪.传递的参数应该是一个 Vector,但它传递的是一个 Python 元组,如下所示:

Looking at what gets passed to the UDF, there seems to be something strange. The argument passed should be a Vector, but instead it gets passed a Python tuple like this:

(1, None, None, [9.7, 1.0, -3.2])

是否不能在 Vectors 的 DataFrame 列上使用 UDF?

Is it not possible to use UDFs on DataFrame columns of Vectors?

编辑

所以在邮件列表中指出这是一个已知问题.将接受@hyim 的答案,因为它确实为密集向量提供了临时解决方法.

So it was pointed out on the mailing list that this is a known issue. Going to accept the answer from @hyim since it does provider a temporary workaround for dense vectors.

推荐答案

在 spark-sql 中,向量被处理为(类型、大小、索引、值)元组.

In spark-sql, vectors are treated (type, size, indices, value) tuple.

您可以在带有 pyspark 的向量上使用 udf.只需修改一些代码即可处理向量类型的值.

You can use udf on vectors with pyspark. Just modify some code to work with values in vector type.

vector_udf = udf(lambda vector: sum(vector[3]), DoubleType())

df.withColumn('feature_sums', vector_udf(df.features)).first()

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala

这篇关于PySpark DataFrame 中向量列的 UDF 问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆