问题与UDF的向量的PySpark数据框列 [英] Issue with UDF on a column of Vectors in PySpark DataFrame

查看:1035
本文介绍了问题与UDF的向量的PySpark数据框列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有对向量的PySpark一列可以在这里说明使用UDF麻烦:

I am having trouble using a UDF on a column of Vectors in PySpark which can be illustrated here:

from pyspark import SparkContext
from pyspark.sql import Row
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from pyspark.mllib.linalg import Vectors

FeatureRow = Row('id', 'features')
data = sc.parallelize([(0, Vectors.dense([9.7, 1.0, -3.2])),
                               (1, Vectors.dense([2.25, -11.1, 123.2])),
                               (2, Vectors.dense([-7.2, 1.0, -3.2]))])
df = data.map(lambda r: FeatureRow(*r)).toDF()

vector_udf = udf(lambda vector: sum(vector), DoubleType())

df.withColumn('feature_sums', vector_udf(df.features)).first()

这失败与以下堆栈跟踪:

This fails with the following stack trace:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 31.0 failed 1 times, most recent failure: Lost task 5.0 in stage 31.0 (TID 95, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
x1  File "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/colin/src/spark/python/pyspark/sql/functions.py", line 469, in <lambda>
    func = lambda _, it: map(lambda x: f(*x), it)
  File "/Users/colin/pokitdok/spark_mapper/spark_mapper/filters.py", line 143, in <lambda>
TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'

在看什么获取传递给UDF,似乎有一些奇怪的事情。传递的参数应该是一个载体,而是它被传递一个Python元组是这样的:

Looking at what gets passed to the UDF, there seems to be something strange. The argument passed should be a Vector, but instead it gets passed a Python tuple like this:

(1, None, None, [9.7, 1.0, -3.2])

是没可能对矢量数据框的列?

Is it not possible to use UDFs on DataFrame columns of Vectors?

修改

于是有人指出,在邮件列表上,这是一个已知问题。展望因为它提供密集向量暂时的解决办法,以接受来自@hyim答案。

So it was pointed out on the mailing list that this is a known issue. Going to accept the answer from @hyim since it does provider a temporary workaround for dense vectors.

推荐答案

在火花SQL,向量处理(类型,大小,指数值)元组。

In spark-sql, vectors are treated (type, size, indices, value) tuple.

您可以用向量与UDF pyspark。只需修改一些code矢量型与价值的工作。

You can use udf on vectors with pyspark. Just modify some code to work with values in vector type.

vector_udf = udf(lambda vector: sum(vector[3]), DoubleType())

df.withColumn('feature_sums', vector_udf(df.features)).first()

<一个href=\"https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala\" rel=\"nofollow\">https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala

这篇关于问题与UDF的向量的PySpark数据框列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆