在整行上使用udf过滤Pyspark Dataframe [英] Filter Pyspark Dataframe with udf on entire row

查看:847
本文介绍了在整行上使用udf过滤Pyspark Dataframe的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否可以选择将整行作为一列输入到Pyspark过滤器udf中?

Is there a way to select the entire row as a column to input into a Pyspark filter udf?

我有一个复杂的过滤功能"my_filter",我想将其应用于整个DataFrame:

I have a complex filtering function "my_filter" that I want to apply to the entire DataFrame:

my_filter_udf = udf(lambda r: my_filter(r), BooleanType())
new_df = df.filter(my_filter_udf(col("*"))

但是

col("*")

引发错误,因为这不是有效的操作.

throws an error because that's not a valid operation.

我知道我可以将数据帧转换为RDD,然后使用RDD的filter方法,但是我不想将其转换为RDD,然后再转换为数据帧.我的DataFrame具有复杂的嵌套类型,因此当我尝试再次将RDD转换为数据帧时,架构推断将失败.

I know that I can convert the dataframe to an RDD and then use the RDD's filter method, but I do NOT want to convert it to an RDD and then back into a dataframe. My DataFrame has complex nested types, so the schema inference fails when I try to convert the RDD into a dataframe again.

推荐答案

您应静态写入所有列.例如:

You should write all columns staticly. For example:

from pyspark.sql import functions as F

# create sample df
df = sc.parallelize([
     (1, 'b'),
     (1, 'c'),

 ]).toDF(["id", "category"])

#simple filter function
@F.udf(returnType=BooleanType())
def my_filter(col1, col2):
    return (col1>0) & (col2=="b")

df.filter(my_filter('id', 'category')).show()

结果:

+---+--------+
| id|category|
+---+--------+
|  1|       b|
+---+--------+

如果您有很多列,并且您确定要按列排序:

If you have so many columns and you are sure to order of columns:

cols = df.columns
df.filter(my_filter(*cols)).show()

产生相同的输出.

这篇关于在整行上使用udf过滤Pyspark Dataframe的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆