一个RDD中的部分/完全匹配值与另一个RDD中的值 [英] Partial/Full-match value in one RDD to values in another RDD
本文介绍了一个RDD中的部分/完全匹配值与另一个RDD中的值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有两个RDD,其中第一个RDD具有以下格式的记录
I have two RDDs where the first RDD has records of the form
RDD1 = (1, 2017-2-13,"ABX-3354 gsfette"
2, 2017-3-18,"TYET-3423 asdsad"
3, 2017-2-09,"TYET-3423 rewriu"
4, 2017-2-13,"ABX-3354 42324"
5, 2017-4-01,"TYET-3423 aerr")
第二个RDD具有以下格式的记录
and the second RDD has records of the form
RDD2 = ('mfr1',"ABX-3354")
('mfr2',"TYET-3423")
我需要找到RDD1中所有与RDD1的第3列匹配到RDD2的第2列的值完全匹配/部分匹配的记录,并得到计数
I need to find all the records in RDD1 which have a full match/partial match for each value in RDD2 matching the 3rd Column of RDD1 to 2nd column of RDD2 and get the count
在此示例中,最终结果将是:
For this example, the end result would be:
ABX-3354 2
TYET-3423 3
做到这一点的最佳方法是什么?
What is the best way to do this?
推荐答案
我正在发布一些使用Spark SQL的解决方案,并且更专注于给定文本中搜索字符串的精确模式匹配.
I am posting couple of solutions with Spark SQL and more focused towards accurate pattern matching of search string in given text.
import spark.implicits._
val df1 = Seq(
(1, "2017-2-13", "ABX-3354 gsfette"),
(2, "2017-3-18", "TYET-3423 asdsad"),
(3, "2017-2-09", "TYET-3423 rewriu"),
(4, "2017-2-13", "ABX-335442324"), //changed from "ABX-3354 42324"
(5, "2017-4-01", "aerrTYET-3423") //changed from "TYET-3423 aerr"
).toDF("id", "dt", "txt")
val df2 = Seq(
("mfr1", "ABX-3354"),
("mfr2", "TYET-3423")
).toDF("col1", "key")
//match function for filter
def matcher(row: Row): Boolean = row.getAs[String]("txt")
.contains(row.getAs[String]("key"))
val join = df1.crossJoin(df2)
import org.apache.spark.sql.functions.count
val result = join.filter(matcher _)
.groupBy("key")
.agg(count("txt").as("count"))
2:使用广播变量
import spark.implicits._
val df1 = Seq(
(1, "2017-2-13", "ABX-3354 gsfette"),
(2, "2017-3-18", "TYET-3423 asdsad"),
(3, "2017-2-09", "TYET-3423 rewriu"),
(4, "2017-2-13", "ABX-3354 42324"),
(5, "2017-4-01", "aerrTYET-3423"),
(6, "2017-4-01", "aerrYET-3423")
).toDF("id", "dt", "pattern")
//small dataset to broadcast
val df2 = Seq(
("mfr1", "ABX-3354"),
("mfr2", "TYET-3423")
).map(_._2) // considering only 2 values in pair
//Lookup to use in UDF
val lookup = spark.sparkContext.broadcast(df2)
//Udf
import org.apache.spark.sql.functions._
val matcher = udf((txt: String) => {
val matches: Seq[String] = lookup.value.filter(txt.contains(_))
if (matches.size > 0) matches.head else null
})
val result = df1.withColumn("match", matcher($"pattern"))
.filter($"match".isNotNull) // not interested in non matching records
.groupBy("match")
.agg(count("pattern").as("count"))
两个解决方案的输出相同
result.show()
+---------+-----+
| key|count|
+---------+-----+
|TYET-3423| 3|
| ABX-3354| 2|
+---------+-----+
这篇关于一个RDD中的部分/完全匹配值与另一个RDD中的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文