数据框上的Pyspark UDF列 [英] Pyspark UDF column on Dataframe
问题描述
我正在尝试根据某些列的值在数据框上创建一个新列.在所有情况下都返回null.任何人都知道这个简单示例出了什么问题吗?
I'm trying to create a new column on a dataframe based on the values of some columns. It's returning null in all cases. Anyone know what's going wrong with this simple example?
df = pd.DataFrame([[0,1,0],[1,0,0],[1,1,1]],columns = ['Foo','Bar','Baz'])
spark_df = spark.createDataFrame(df)
def get_profile():
if 'Foo'==1:
return 'Foo'
elif 'Bar' == 1:
return 'Bar'
elif 'Baz' ==1 :
return 'Baz'
spark_df = spark_df.withColumn('get_profile', lit(get_profile()))
spark_df.show()
Foo Bar Baz get_profile
0 1 0 None
1 0 0 None
1 1 1 None
我希望所有行的get_profile列都可以填写.
I would expect that the get_profile column would be filled out for all rows.
我也尝试过:
spark_udf = udf(get_profile,StringType())
spark_df = spark_df.withColumn('get_profile', spark_udf())
print(spark_df.toPandas())
达到相同的效果.
推荐答案
udf
不知道列名是什么.因此,它将检查if
/elif
块中的每个条件,所有条件均评估为False
.因此,函数将返回None
.
The udf
has no knowledge of what the column names are. So it checks each of your conditions in your if
/elif
block and all of them evaluate to False
. Thus the function will return None
.
您必须重写udf
才能包含要检查的列:
You'd have to rewrite your udf
to take in the columns you want to check:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def get_profile(foo, bar, baz):
if foo == 1:
return 'Foo'
elif bar == 1:
return 'Bar'
elif baz == 1 :
return 'Baz'
spark_udf = udf(get_profile, StringType())
spark_df = spark_df.withColumn('get_profile',spark_udf('Foo', 'Bar', 'Baz'))
spark_df.show()
#+---+---+---+-----------+
#|Foo|Bar|Baz|get_profile|
#+---+---+---+-----------+
#| 0| 1| 0| Bar|
#| 1| 0| 0| Foo|
#| 1| 1| 1| Foo|
#+---+---+---+-----------+
如果您有很多列,并且想要全部传递(按顺序):
If you have a lot of columns and want to pass them all (in order):
spark_df = spark_df.withColumn('get_profile', spark_udf(*spark_df.columns))
通常,您可以解压缩任何有序的列列表:
More generally, you can unpack any ordered list of columns:
cols_to_pass_to_udf = ['Foo', 'Bar', 'Baz']
spark_df = spark_df.withColumn('get_profile', spark_udf(*cols_to_pass_to_udf ))
但是此特定操作不需要udf
.我会这样:
But this particular operation does not require a udf
. I would do it this way:
from pyspark.sql.functions import coalesce, when, col, lit
spark_df.withColumn(
"get_profile",
coalesce(*[when(col(c)==1, lit(c)) for c in spark_df.columns])
).show()
#+---+---+---+-----------+
#|Foo|Bar|Baz|get_profile|
#+---+---+---+-----------+
#| 0| 1| 0| Bar|
#| 1| 0| 0| Foo|
#| 1| 1| 1| Foo|
#+---+---+---+-----------+
之所以可行,是因为如果条件的计算结果为False
并且未指定otherwise
,则pyspark.sql.functions.when()
默认情况下将返回null
.然后pyspark.sql.functions.coalesce
的列表理解将返回第一个非空列.
This works because pyspark.sql.functions.when()
will return null
by default if the condition evaluates to False
and no otherwise
is specified. Then the list comprehension of pyspark.sql.functions.coalesce
will return the first non-null column.
请注意,仅当列的顺序与在get_profile
函数中求值的顺序相同时,这才等效于udf
.为了更加明确,您应该执行以下操作:
Note this is equivalent to the udf
ONLY if the order of the columns is the same as the sequence that's evaluated in the get_profile
function. To be more explicit, you should do:
spark_df.withColumn(
"get_profile",
coalesce(*[when(col(c)==1, lit(c)) for c in ['Foo', 'Bar', 'Baz'])
).show()
这篇关于数据框上的Pyspark UDF列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!