Spark(scala)数据帧-检查列中的字符串是否包含集合中的任何项目 [英] Spark (scala) dataframes - Check whether strings in column contain any items from a set

查看:730
本文介绍了Spark(scala)数据帧-检查列中的字符串是否包含集合中的任何项目的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对scala和spark还是很陌生,我整天都在努力寻找解决方案-尽我所能.我已经尝试了以下代码的20种不同版本,并不断获得尝试在列上执行计算时出错.

I'm pretty new to scala and spark and I've been trying to find a solution for this issue all day - it's doing my head in. I've tried 20 different variations of the following code and keep getting type mismatch errors when I try to perform calculations on a column.

我有一个spark数据框,我希望检查特定列中的每个字符串是否包含来自预定义的List(或Set)单词的任意数量的单词.

I have a spark dataframe, and I wish to check whether each string in a particular column contains any number of words from a pre-defined List (or Set) of words.

以下是一些用于复制的示例数据:

Here is some example data for replication:

// sample data frame
val df = Seq(
      (1, "foo"),
      (2, "barrio"),
      (3, "gitten"),
      (4, "baa")).toDF("id", "words")

// dictionary Set of words to check 
val dict = Set("foo","bar","baaad")

现在,我正在尝试用比较结果创建第三列,以查看其中的$"words"列中的字符串是否包含dict单词集中的任何单词.所以结果应该是:

Now, i am trying to create a third column with the results of a comparison to see if the strings in the $"words" column within them contain any of the words in the dict Set of words. So the result should be:

+---+-----------+-------------+
| id|      words|   word_check| 
+---+-----------+-------------+
|  1|        foo|         true|     
|  2|      bario|         true|
|  3|     gitten|        false|
|  4|        baa|        false|
+---+-----------+-------------+

首先,我尝试看看我是否可以不使用UDF进行本机操作,因为dict集实际上是一个大于40K单词的大型词典,据我了解,这将比UDF更有效:

First, I tried to see if i could do it natively without using UDFs, since the dict Set will actually be a large dictionary of > 40K words, and as I understand it this would be more efficient than a UDF:

df.withColumn("word_check", dict.exists(d => $"words".contains(d)))

但是我得到了错误:

type mismatch;
found   : org.apache.spark.sql.Column
required: Boolean

我还尝试创建一个UDF来做到这一点(使用mutable.Setmutable.WrappedArray来描述集合-不确定哪个是正确的,但都不起作用)

I have also tried to create a UDF to do this (using both mutable.Set and mutable.WrappedArray to describe the Set - not sure which is correct but neither work):

val checker: ((String, scala.collection.mutable.Set[String]) => Boolean) = (col: String, array: scala.collection.mutable.Set[String] ) =>  array.exists(d => col.contains(d))

val udf1 = udf(checker)

df.withColumn("word_check", udf1($"words", dict )).show()

但是出现另一种类型的不匹配:

But get another type mismatch:

 found   : scala.collection.immutable.Set[String]
 required: org.apache.spark.sql.Column

如果集合是固定数字,我应该能够在表达式中使用Lit(Int)吗?但是我真的不理解通过在Scala中混合使用不同的数据类型在列上执行更复杂的功能.

If the set was a fixed number, I should be able to use Lit(Int) in the expression? But I don't really understand performing more complex functions on a column by mixing different data types works in scala.

任何帮助都将不胜感激,特别是如果它可以高效完成(这是一个大于5m行的大df).

Any help greatly appreciated, especially if it can be done efficiently (it is a large df of > 5m rows).

推荐答案

如果您的字典很大,则不应仅在udf中引用它,因为对于每个任务,整个字典都是通过网络发送的.我会结合udf广播您的字典:

if your dict is large, you should not just reference it in your udf, because the entire dict is sent over the network for every task. I would broadcast your dict in combination with an udf:

import org.apache.spark.broadcast.Broadcast

def udf_check(words: Broadcast[scala.collection.immutable.Set[String]]) = {
  udf {(s: String) => words.value.exists(s.contains(_))}
}

df.withColumn("word_check", udf_check(sparkContext.broadcast(dict))($"words"))

或者,您也可以使用联接:

alternatively, you could also use a join:

val dict_df = dict.toList.toDF("word")

df
  .join(broadcast(dict_df),$"words".contains($"word"),"left")
  .withColumn("word_check",$"word".isNotNull)
  .drop($"word")

这篇关于Spark(scala)数据帧-检查列中的字符串是否包含集合中的任何项目的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆