Spark(scala)数据帧-检查列中的字符串是否包含集合中的任何项目 [英] Spark (scala) dataframes - Check whether strings in column contain any items from a set
问题描述
我对scala和spark还是很陌生,我整天都在努力寻找解决方案-尽我所能.我已经尝试了以下代码的20种不同版本,并不断获得
I'm pretty new to scala and spark and I've been trying to find a solution for this issue all day - it's doing my head in. I've tried 20 different variations of the following code and keep getting type mismatch
errors when I try to perform calculations on a column.
我有一个spark数据框,我希望检查特定列中的每个字符串是否包含来自预定义的List
(或Set
)单词的任意数量的单词.
I have a spark dataframe, and I wish to check whether each string in a particular column contains any number of words from a pre-defined List
(or Set
) of words.
以下是一些用于复制的示例数据:
Here is some example data for replication:
// sample data frame
val df = Seq(
(1, "foo"),
(2, "barrio"),
(3, "gitten"),
(4, "baa")).toDF("id", "words")
// dictionary Set of words to check
val dict = Set("foo","bar","baaad")
现在,我正在尝试用比较结果创建第三列,以查看其中的$"words"
列中的字符串是否包含dict
单词集中的任何单词.所以结果应该是:
Now, i am trying to create a third column with the results of a comparison to see if the strings in the $"words"
column within them contain any of the words in the dict
Set of words. So the result should be:
+---+-----------+-------------+
| id| words| word_check|
+---+-----------+-------------+
| 1| foo| true|
| 2| bario| true|
| 3| gitten| false|
| 4| baa| false|
+---+-----------+-------------+
首先,我尝试看看我是否可以不使用UDF进行本机操作,因为dict集实际上是一个大于40K单词的大型词典,据我了解,这将比UDF更有效:>
First, I tried to see if i could do it natively without using UDFs, since the dict Set will actually be a large dictionary of > 40K words, and as I understand it this would be more efficient than a UDF:
df.withColumn("word_check", dict.exists(d => $"words".contains(d)))
但是我得到了错误:
type mismatch;
found : org.apache.spark.sql.Column
required: Boolean
我还尝试创建一个UDF来做到这一点(使用mutable.Set
和mutable.WrappedArray
来描述集合-不确定哪个是正确的,但都不起作用)
I have also tried to create a UDF to do this (using both mutable.Set
and mutable.WrappedArray
to describe the Set - not sure which is correct but neither work):
val checker: ((String, scala.collection.mutable.Set[String]) => Boolean) = (col: String, array: scala.collection.mutable.Set[String] ) => array.exists(d => col.contains(d))
val udf1 = udf(checker)
df.withColumn("word_check", udf1($"words", dict )).show()
但是出现另一种类型的不匹配:
But get another type mismatch:
found : scala.collection.immutable.Set[String]
required: org.apache.spark.sql.Column
如果集合是固定数字,我应该能够在表达式中使用Lit(Int)
吗?但是我真的不理解通过在Scala中混合使用不同的数据类型在列上执行更复杂的功能.
If the set was a fixed number, I should be able to use Lit(Int)
in the expression? But I don't really understand performing more complex functions on a column by mixing different data types works in scala.
任何帮助都将不胜感激,特别是如果它可以高效完成(这是一个大于5m行的大df).
Any help greatly appreciated, especially if it can be done efficiently (it is a large df of > 5m rows).
推荐答案
如果您的字典很大,则不应仅在udf中引用它,因为对于每个任务,整个字典都是通过网络发送的.我会结合udf广播您的字典:
if your dict is large, you should not just reference it in your udf, because the entire dict is sent over the network for every task. I would broadcast your dict in combination with an udf:
import org.apache.spark.broadcast.Broadcast
def udf_check(words: Broadcast[scala.collection.immutable.Set[String]]) = {
udf {(s: String) => words.value.exists(s.contains(_))}
}
df.withColumn("word_check", udf_check(sparkContext.broadcast(dict))($"words"))
或者,您也可以使用联接:
alternatively, you could also use a join:
val dict_df = dict.toList.toDF("word")
df
.join(broadcast(dict_df),$"words".contains($"word"),"left")
.withColumn("word_check",$"word".isNotNull)
.drop($"word")
这篇关于Spark(scala)数据帧-检查列中的字符串是否包含集合中的任何项目的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!