PySpark 逐行函数组合 [英] PySpark row-wise function composition

查看:30
本文介绍了PySpark 逐行函数组合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

作为一个简化的例子,我有一个数据框df",列col1,col2",我想在对每列应用一个函数后计算行式最大值:

As a simplified example, I have a dataframe "df" with columns "col1,col2" and I want to compute a row-wise maximum after applying a function to each column :

def f(x):
    return (x+1)

max_udf=udf(lambda x,y: max(x,y), IntegerType())
f_udf=udf(f, IntegerType())

df2=df.withColumn("result", max_udf(f_udf(df.col1),f_udf(df.col2)))

所以如果 df:

col1   col2
1      2
3      0

然后

df2:

col1   col2  result
1      2     3
3      0     4

以上似乎不起作用并产生无法计算表达式:PythonUDF#f..."

The above doesn't seem to work and produces "Cannot evaluate expression: PythonUDF#f..."

我绝对肯定f_udf"在我的桌子上工作得很好,主要问题在于 max_udf.

I'm absolutely positive "f_udf" works just fine on my table, and the main issue is with the max_udf.

不创建额外的列或使用基本的 map/reduce,有没有办法完全使用数据框和 udfs 来完成上述操作?我应该如何修改max_udf"?

Without creating extra columns or using basic map/reduce, is there a way to do the above entirely using dataframes and udfs? How should I modify "max_udf"?

我也试过:

max_udf=udf(max, IntegerType())

产生相同的错误.

我还确认以下内容有效:

I've also confirmed that the following works:

df2=(df.withColumn("temp1", f_udf(df.col1))
       .withColumn("temp2", f_udf(df.col2))

df2=df2.withColumn("result", max_udf(df2.temp1,df2.temp2))

为什么我不能一次性完成这些?

Why is it that I can't do these in one go?

我想看到一个可以推广到任何函数f_udf"和max_udf"的答案.

推荐答案

我遇到了类似的问题,并在 这个stackoverflow 问题

I had a similar problem and found the solution in the answer to this stackoverflow question

要将多列或整行传递给 UDF,请使用 结构:

To pass multiple columns or a whole row to an UDF use a struct:

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import IntegerType

df = sqlContext.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))

count_empty_columns = udf(lambda row: len([x for x in row if x == None]), IntegerType())

new_df = df.withColumn("null_count", count_empty_columns(struct([df[x] for x in df.columns])))

new_df.show()

返回:

+----+----+----------+
|   a|   b|null_count|
+----+----+----------+
|null|null|         2|
|   1|null|         1|
|null|   2|         1|
+----+----+----------+

这篇关于PySpark 逐行函数组合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆