将 udf 应用于多列并使用 numpy 操作 [英] apply udf to multiple columns and use numpy operations

查看:27
本文介绍了将 udf 应用于多列并使用 numpy 操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 pyspark 中有一个名为 result 的数据框,我想应用一个 udf 来创建一个新列,如下所示:

I have a dataframe named result in pyspark and I want to apply a udf to create a new column as below:

result = sqlContext.createDataFrame([(138,5,10), (128,4,10), (112,3,10), (120,3,10), (189,1,10)]).withColumnRenamed("_1","count").withColumnRenamed("_2","df").withColumnRenamed("_3","docs")
@udf("float")
def newFunction(arr):
    return (1 + np.log(arr[0])) * np.log(arr[2]/arr[1])

result=result.withColumn("new_function_result",newFunction_udf(array("count","df","docs")))

列数、df、docs 都是整数列.但这会返回

the column count,df,docs all are integer columns.but this returns

Py4JError:调用时发生错误z:org.apache.spark.sql.functions.col.跟踪:py4j.Py4JException:方法 col([class java.util.ArrayList]) 不存在于py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)在py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)在 py4j.Gateway.invoke(Gateway.java:274) 在py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在py4j.GatewayConnection.run(GatewayConnection.java:214) 在java.lang.Thread.run(Thread.java:748)

Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace: py4j.Py4JException: Method col([class java.util.ArrayList]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339) at py4j.Gateway.invoke(Gateway.java:274) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748)

当我尝试传递一列并获取其中的平方时,效果很好.

When I try passing one column and getting squares of those it works fine.

感谢任何帮助.

推荐答案

该错误消息具有误导性,但试图告诉您您的函数没有返回浮点数.您的函数返回 numpy.float64 类型的值,您可以使用 VectorUDT 类型(函数:newFunctionVector 在下面的示例中)获取该值.另一种使用 numpy 的方法是将 numpy 类型 numpy.float64 转换为 python 类型 float(函数:newFunctionWithArray 在下面的示例中).

The error message is misleading, but is trying to tell you that your function doesn't return a float. Your function returns value of type numpy.float64 which you can fetch with the VectorUDT type (Function: newFunctionVector in the example below). Another way to make use of numpy is by casting the numpy type numpy.float64 to the python type float (Function: newFunctionWithArray in the example below).

最后但并非最不重要的是,没有必要调用 array 因为 udfs 可以使用多个参数(函数:newFunction 在下面的例子中).

Last but not least, it is not necessary to call array as udfs can use more than one parameter (Function: newFunction in the example below).

import numpy as np
from pyspark.sql.functions import udf, array
from pyspark.sql.types import FloatType
from pyspark.mllib.linalg import Vectors, VectorUDT

result = sqlContext.createDataFrame([(138,5,10), (128,4,10), (112,3,10), (120,3,10), (189,1,10)], ["count","df","docs"])

def newFunctionVector(arr):
    return (1 + np.log(arr[0])) * np.log(arr[2]/arr[1])

@udf("float")
def newFunctionWithArray(arr):
    returnValue = (1 + np.log(arr[0])) * np.log(arr[2]/arr[1])
    return returnValue.item()

@udf("float")
def newFunction(count, df, docs):
    returnValue = (1 + np.log(count)) * np.log(docs/df)
    return returnValue.item()


vector_udf = udf(newFunctionVector, VectorUDT())

result=result.withColumn("new_function_result", newFunction("count","df","docs"))

result=result.withColumn("new_function_result_WithArray", newFunctionWithArray(array("count","df","docs")))

result=result.withColumn("new_function_result_Vector", newFunctionWithArray(array("count","df","docs")))

result.printSchema()

result.show()

输出:

root 
|-- count: long (nullable = true) 
|-- df: long (nullable = true) 
|-- docs: long (nullable = true) 
|-- new_function_result: float (nullable = true) 
|-- new_function_result_WithArray: float (nullable = true) 
|-- new_function_result_Vector: float (nullable = true)

+-----+---+----+-------------------+-----------------------------+--------------------------+ 
|count| df|docs|new_function_result|new_function_result_WithArray|new_function_result_Vector|
+-----+---+----+-------------------+-----------------------------+--------------------------+ 
|  138|  5|  10|           4.108459|                     4.108459|                  4.108459| 
|  128|  4|  10|           5.362161|                     5.362161|                  5.362161|
|  112|  3|  10|          6.8849173|                    6.8849173|                 6.8849173|
|  120|  3|  10|           6.967983|                     6.967983|                  6.967983|
|  189|  1|  10|          14.372153|                    14.372153|                 14.372153|  
+-----+---+----+-------------------+-----------------------------+--------------------------+

这篇关于将 udf 应用于多列并使用 numpy 操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆