一个 RDD 中的部分/完全匹配值到另一个 RDD 中的值 [英] Partial/Full-match value in one RDD to values in another RDD

查看:28
本文介绍了一个 RDD 中的部分/完全匹配值到另一个 RDD 中的值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个 RDD,其中第一个 RDD 具有以下形式的记录

I have two RDDs where the first RDD has records of the form

RDD1 = (1, 2017-2-13,"ABX-3354 gsfette"
        2, 2017-3-18,"TYET-3423 asdsad"
        3, 2017-2-09,"TYET-3423 rewriu"
        4, 2017-2-13,"ABX-3354 42324"
        5, 2017-4-01,"TYET-3423 aerr")

并且第二个 RDD 有表单的记录

and the second RDD has records of the form

RDD2 = ('mfr1',"ABX-3354")
       ('mfr2',"TYET-3423")

我需要找到 RDD1 中的所有记录,这些记录与 RDD2 中每个值的完全匹配/部分匹配匹配 RDD1 的第 3 列到 RDD2 的第 2 列并获得计数

I need to find all the records in RDD1 which have a full match/partial match for each value in RDD2 matching the 3rd Column of RDD1 to 2nd column of RDD2 and get the count

对于这个例子,最终结果是:

For this example, the end result would be:

ABX-3354  2
TYET-3423 3

最好的方法是什么?

推荐答案

我发布了几个使用 Spark SQL 的解决方案,并且更专注于给定文本中搜索字符串的准确模式匹配.

I am posting couple of solutions with Spark SQL and more focused towards accurate pattern matching of search string in given text.

import spark.implicits._

val df1 = Seq(
  (1, "2017-2-13", "ABX-3354 gsfette"),
  (2, "2017-3-18", "TYET-3423 asdsad"),
  (3, "2017-2-09", "TYET-3423 rewriu"),
  (4, "2017-2-13", "ABX-335442324"), //changed from "ABX-3354 42324"
  (5, "2017-4-01", "aerrTYET-3423") //changed from "TYET-3423 aerr"
).toDF("id", "dt", "txt")

val df2 = Seq(
  ("mfr1", "ABX-3354"),
  ("mfr2", "TYET-3423")
).toDF("col1", "key")

//match function for filter
def matcher(row: Row): Boolean = row.getAs[String]("txt")
  .contains(row.getAs[String]("key"))

val join = df1.crossJoin(df2)

import org.apache.spark.sql.functions.count

val result = join.filter(matcher _)
  .groupBy("key")
  .agg(count("txt").as("count"))

2:使用广播变量

import spark.implicits._

val df1 = Seq(
  (1, "2017-2-13", "ABX-3354 gsfette"),
  (2, "2017-3-18", "TYET-3423 asdsad"),
  (3, "2017-2-09", "TYET-3423 rewriu"),
  (4, "2017-2-13", "ABX-3354 42324"),
  (5, "2017-4-01", "aerrTYET-3423"),
  (6, "2017-4-01", "aerrYET-3423")
).toDF("id", "dt", "pattern")

//small dataset to broadcast
val df2 = Seq(
  ("mfr1", "ABX-3354"),
  ("mfr2", "TYET-3423")
).map(_._2) // considering only 2 values in pair

//Lookup to use in UDF
val lookup = spark.sparkContext.broadcast(df2)

//Udf
import org.apache.spark.sql.functions._
val matcher = udf((txt: String) => {
  val matches: Seq[String] = lookup.value.filter(txt.contains(_))
  if (matches.size > 0) matches.head else null
})

val result = df1.withColumn("match", matcher($"pattern"))
  .filter($"match".isNotNull) // not interested in non matching records
  .groupBy("match")
  .agg(count("pattern").as("count"))

两种解决方案产生相同的输出

result.show()

+---------+-----+
|      key|count|
+---------+-----+
|TYET-3423|    3|
| ABX-3354|    2|
+---------+-----+

这篇关于一个 RDD 中的部分/完全匹配值到另一个 RDD 中的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆