数据框上的Pyspark UDF列 [英] Pyspark UDF column on Dataframe

查看:116
本文介绍了数据框上的Pyspark UDF列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试根据某些列的值在数据框上创建一个新列.在所有情况下都返回null.任何人都知道这个简单示例出了什么问题吗?

I'm trying to create a new column on a dataframe based on the values of some columns. It's returning null in all cases. Anyone know what's going wrong with this simple example?

df = pd.DataFrame([[0,1,0],[1,0,0],[1,1,1]],columns = ['Foo','Bar','Baz'])

spark_df = spark.createDataFrame(df)

def get_profile():
    if 'Foo'==1:
        return 'Foo'
    elif 'Bar' == 1:
        return 'Bar'
    elif 'Baz' ==1 :
        return 'Baz'

spark_df = spark_df.withColumn('get_profile', lit(get_profile()))
spark_df.show()

   Foo  Bar  Baz get_profile
    0    1    0        None
    1    0    0        None
    1    1    1        None

我希望所有行的get_profile列都可以填写.

I would expect that the get_profile column would be filled out for all rows.

我也尝试过:

spark_udf = udf(get_profile,StringType())

spark_df = spark_df.withColumn('get_profile', spark_udf())
print(spark_df.toPandas())

达到相同的效果.

推荐答案

udf不知道列名是什么.因此,它将检查if/elif块中的每个条件,所有条件均评估为False.因此,函数将返回None .

The udf has no knowledge of what the column names are. So it checks each of your conditions in your if/elif block and all of them evaluate to False. Thus the function will return None.

您必须重写udf才能包含要检查的列:

You'd have to rewrite your udf to take in the columns you want to check:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def get_profile(foo, bar, baz):
    if foo == 1:
        return 'Foo'
    elif bar == 1:
        return 'Bar'
    elif baz == 1 :
        return 'Baz'

spark_udf = udf(get_profile, StringType())
spark_df = spark_df.withColumn('get_profile',spark_udf('Foo', 'Bar', 'Baz'))
spark_df.show()
#+---+---+---+-----------+
#|Foo|Bar|Baz|get_profile|
#+---+---+---+-----------+
#|  0|  1|  0|        Bar|
#|  1|  0|  0|        Foo|
#|  1|  1|  1|        Foo|
#+---+---+---+-----------+

如果您有很多列,并且想要全部传递(按顺序):

If you have a lot of columns and want to pass them all (in order):

spark_df = spark_df.withColumn('get_profile', spark_udf(*spark_df.columns))

通常,您可以解压缩任何有序的列列表:

More generally, you can unpack any ordered list of columns:

cols_to_pass_to_udf = ['Foo', 'Bar', 'Baz']
spark_df = spark_df.withColumn('get_profile', spark_udf(*cols_to_pass_to_udf ))

但是此特定操作不需要udf.我会这样:

But this particular operation does not require a udf. I would do it this way:

from pyspark.sql.functions import coalesce, when, col, lit

spark_df.withColumn(
    "get_profile",
    coalesce(*[when(col(c)==1, lit(c)) for c in spark_df.columns])
).show()
#+---+---+---+-----------+
#|Foo|Bar|Baz|get_profile|
#+---+---+---+-----------+
#|  0|  1|  0|        Bar|
#|  1|  0|  0|        Foo|
#|  1|  1|  1|        Foo|
#+---+---+---+-----------+

之所以可行,是因为如果条件的计算结果为False并且未指定otherwise,则pyspark.sql.functions.when()默认情况下将返回null.然后pyspark.sql.functions.coalesce的列表理解将返回第一个非空列.

This works because pyspark.sql.functions.when() will return null by default if the condition evaluates to False and no otherwise is specified. Then the list comprehension of pyspark.sql.functions.coalesce will return the first non-null column.

请注意,仅当列的顺序与在get_profile函数中求值的顺序相同时,这才等效于udf.为了更加明确,您应该执行以下操作:

Note this is equivalent to the udf ONLY if the order of the columns is the same as the sequence that's evaluated in the get_profile function. To be more explicit, you should do:

spark_df.withColumn(
    "get_profile",
    coalesce(*[when(col(c)==1, lit(c)) for c in ['Foo', 'Bar', 'Baz'])
).show()

这篇关于数据框上的Pyspark UDF列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆