pyspark我们如何检查列表中是否包含列值 [英] pyspark how do we check if a column value is contained in a list

查看:1507
本文介绍了pyspark我们如何检查列表中是否包含列值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图找出是否有一个函数可以检查spark DataFrame的列是否包含列表中的任何值:

I'm trying to figure out if there is a function that would check if a column of a spark DataFrame contains any of the values in a list:

# define a dataframe
rdd = sc.parallelize([(0,100), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)])
df = sqlContext.createDataFrame(rdd, ["id", "score"])

# define a list of scores
l = [1]

# filter out records by scores by list l
records = df.filter(~df.score.contains(l))

# expected: (0,100), (0,1), (1,10), (3,18)

运行此代码时出现问题:

I get an issue running this code :

java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [1]

有没有办法做到这一点,还是我们必须遍历列表才能传递包含?

Is there a way to do this or do we have to loop through the list to pass contains?

推荐答案

我看到了一些不使用udf的方法.

I see some ways to do this without using a udf.

您可以将列表理解与

You could use a list comprehension with pyspark.sql.functions.regexp_extract, exploiting the fact that an empty string is returned if there is no match.

尝试提取列表l中的所有值并连接结果.如果最终的连接字符串是一个空字符串,则表示没有任何匹配的值.

Try to extract all of the values in the list l and concatenate the results. If the resulting concatenated string is an empty string, that means none of the values matched.

例如:

from pyspark.sql.functions import concat, regexp_extract

records = df.where(concat(*[regexp_extract("score", str(val), 0) for val in l]) != "")
records.show()
#+---+-----+
#| id|score|
#+---+-----+
#|  0|  100|
#|  0|    1|
#|  1|   10|
#|  3|   18|
#|  3|   18|
#|  3|   18|
#+---+-----+

如果查看执行计划,您会发现它足够聪明,可以将score列隐式地强制为string:

If you take a look at the execution plan, you'll see that it's smart enough cast the score column to string implicitly:

records.explain()
#== Physical Plan ==
#*Filter NOT (concat(regexp_extract(cast(score#11L as string), 1, 0)) = )
#+- Scan ExistingRDD[id#10L,score#11L]

另一种方法是使用 rlike ):

Another way is to use pyspark.sql.Column.like (or similarly with rlike):

from functools import reduce
from pyspark.sql.functions import col

records = df.where(
    reduce(
        lambda a, b: a|b, 
        map(
            lambda val: col("score").like(val.join(["%", "%"])), 
            map(str, l)
        )
    )
)

产生与上述相同的输出,并具有以下执行计划:

Which produces the same output as above and has the following execution plan:

#== Physical Plan ==
#*Filter Contains(cast(score#11L as string), 1)
#+- Scan ExistingRDD[id#10L,score#11L]


如果只需要不同的记录,则可以执行以下操作:


If you wanted only distinct records, you can do:

records.distinct().show()
#+---+-----+
#| id|score|
#+---+-----+
#|  0|    1|
#|  0|  100|
#|  3|   18|
#|  1|   10|
#+---+-----+

这篇关于pyspark我们如何检查列表中是否包含列值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆