在整行上使用 udf 过滤 Pyspark 数据框 [英] Filter Pyspark Dataframe with udf on entire row
问题描述
有没有办法选择整行作为一列输入到 Pyspark 过滤器 udf 中?
Is there a way to select the entire row as a column to input into a Pyspark filter udf?
我有一个复杂的过滤函数my_filter",我想将其应用于整个 DataFrame:
I have a complex filtering function "my_filter" that I want to apply to the entire DataFrame:
my_filter_udf = udf(lambda r: my_filter(r), BooleanType())
new_df = df.filter(my_filter_udf(col("*"))
但是
col("*")
抛出错误,因为这不是有效的操作.
throws an error because that's not a valid operation.
我知道我可以将数据帧转换为 RDD,然后使用 RDD 的过滤方法,但我不想将其转换为 RDD,然后再转换回数据帧.我的 DataFrame 具有复杂的嵌套类型,因此当我再次尝试将 RDD 转换为数据帧时,模式推断失败.
I know that I can convert the dataframe to an RDD and then use the RDD's filter method, but I do NOT want to convert it to an RDD and then back into a dataframe. My DataFrame has complex nested types, so the schema inference fails when I try to convert the RDD into a dataframe again.
推荐答案
您应该静态地编写所有列.例如:
You should write all columns staticly. For example:
from pyspark.sql import functions as F
# create sample df
df = sc.parallelize([
(1, 'b'),
(1, 'c'),
]).toDF(["id", "category"])
#simple filter function
@F.udf(returnType=BooleanType())
def my_filter(col1, col2):
return (col1>0) & (col2=="b")
df.filter(my_filter('id', 'category')).show()
结果:
+---+--------+
| id|category|
+---+--------+
| 1| b|
+---+--------+
如果您有这么多列并且您确定要对列进行排序:
If you have so many columns and you are sure to order of columns:
cols = df.columns
df.filter(my_filter(*cols)).show()
产生相同的输出.
这篇关于在整行上使用 udf 过滤 Pyspark 数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!