如何在Spark的过滤条件中使用NOT IN子句 [英] How to use NOT IN clause in filter condition in spark

查看:2698
本文介绍了如何在Spark的过滤条件中使用NOT IN子句的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我要过滤RDD源的列:

I want to filter a column of an RDD source :

val source = sql("SELECT * from sample.source").rdd.map(_.mkString(","))
val destination = sql("select * from sample.destination").rdd.map(_.mkString(","))

val source_primary_key = source.map(rec => (rec.split(",")(0)))
val destination_primary_key = destination.map(rec => (rec.split(",")(0)))

val src = source_primary_key.subtractByKey(destination_primary_key)

我想在过滤条件中使用IN子句,仅从源中过滤出src中存在的值,如下所示(编辑):

I want to use IN clause in filter condition to filter out only the values present in src from source, something like below(EDITED):

val source = spark.read.csv(inputPath + "/source").rdd.map(_.mkString(","))
val destination = spark.read.csv(inputPath + "/destination").rdd.map(_.mkString(","))

val source_primary_key = source.map(rec => (rec.split(",")(0)))
val destination_primary_key = destination.map(rec => (rec.split(",")(0)))

val extra_in_source = source_primary_key.filter(rec._1 != destination_primary_key._1)

等效的SQL代码是

SELECT * FROM SOURCE WHERE ID IN (select ID from src)

谢谢

推荐答案

由于您的代码不可复制,因此下面是一个使用spark-sql的小示例,说明如何使用select * from t where id in (...):

Since your code isn't reproducible, here is a small example using spark-sql on how to select * from t where id in (...) :

// create a DataFrame for a range 'id' from 1 to 9.
scala> val df = spark.range(1,10).toDF
df: org.apache.spark.sql.DataFrame = [id: bigint]

// values to exclude
scala> val f = Seq(5,6,7)
f: Seq[Int] = List(5, 6, 7)

// select * from df where id is not in the values to exclude
scala> df.filter(!col("id").isin(f  : _*)).show
+---+                                                                           
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  8|
|  9|
+---+

// select * from df where id is in the values to exclude
scala> df.filter(col("id").isin(f  : _*)).show

这是not isin的RDD版本:

scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> val f = Seq(5,6,7)
f: Seq[Int] = List(5, 6, 7)

scala> val rdd2 = rdd.filter(x => !f.contains(x))
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at filter at <console>:28

尽管如此,我仍然认为这是一个过大的杀伤力,因为您已经在使用spark-sql.

Nevertheless, I still believe this is an overkill since you are already using spark-sql.

在您的情况下,您似乎实际上正在处理DataFrames,因此上述解决方案不起作用.

It seems in your case that you are actually dealing with DataFrames, thus the solutions mentioned above don't work.

您可以使用左防连接方法:

You can use the left anti join approach :

scala> val source = spark.read.format("csv").load("source.file")
source: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 9 more fields]

scala> val destination = spark.read.format("csv").load("destination.file")
destination: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 9 more fields]

scala> source.show
+---+------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+
|_c0|               _c1|     _c2|       _c3|            _c4|_c5|_c6|       _c7|  _c8|      _c9|        _c10|
+---+------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+
|  1|        Ravi kumar|   Ravi |     kumar|           MSO |  1|  M|17-01-1994| 74.5| 24000.78|    Alabama |
|  2|Shekhar shudhanshu| Shekhar|shudhanshu|      Manulife |  2|  M|18-01-1994|76.34|   250000|     Alaska |
|  3|Preethi Narasingam| Preethi|Narasingam|        Retail |  3|  F|19-01-1994|77.45|270000.01|    Arizona |
|  4|     Abhishek Nair|Abhishek|      Nair|       Banking |  4|  M|20-01-1994|78.65|   345000|   Arkansas |
|  5|        Ram Sharma|     Ram|    Sharma|Infrastructure |  5|  M|21-01-1994|79.12|    45000| California |
|  6|   Chandani Kumari|Chandani|    Kumari|          BNFS |  6|  F|22-01-1994|80.13| 43000.02|   Colorado |
|  7|      Balaji Kumar|  Balaji|     Kumar|           MSO |  1|  M|23-01-1994|81.33|  1234678|Connecticut |
|  8|  Naveen Shekrappa|  Naveen| Shekrappa|      Manulife |  2|  M|24-01-1994|  100|   789414|   Delaware |
|  9|     Milind Chavan|  Milind|    Chavan|        Retail |  3|  M|25-01-1994|83.66|   245555|    Florida |
| 10|      Raghu Rajeev|   Raghu|    Rajeev|       Banking |  4|  M|26-01-1994|87.65|   235468|     Georgia|
+---+------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+


scala> destination.show
+---+-------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+
|_c0|                _c1|     _c2|       _c3|            _c4|_c5|_c6|       _c7|  _c8|      _c9|        _c10|
+---+-------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+
|  1|         Ravi kumar|   Revi |     kumar|           MSO |  1|  M|17-01-1994| 74.5| 24000.78|    Alabama |
|  1|        Ravi1 kumar|   Revi |     kumar|           MSO |  1|  M|17-01-1994| 74.5| 24000.78|    Alabama |
|  1|        Ravi2 kumar|   Revi |     kumar|           MSO |  1|  M|17-01-1994| 74.5| 24000.78|    Alabama |
|  2| Shekhar shudhanshu| Shekhar|shudhanshu|      Manulife |  2|  M|18-01-1994|76.34|   250000|     Alaska |
|  3|Preethi Narasingam1| Preethi|Narasingam|        Retail |  3|  F|19-01-1994|77.45|270000.01|    Arizona |
|  4|     Abhishek Nair1|Abhishek|      Nair|       Banking |  4|  M|20-01-1994|78.65|   345000|   Arkansas |
|  5|         Ram Sharma|     Ram|    Sharma|Infrastructure |  5|  M|21-01-1994|79.12|    45000| California |
|  6|    Chandani Kumari|Chandani|    Kumari|          BNFS |  6|  F|22-01-1994|80.13| 43000.02|   Colorado |
|  7|       Balaji Kumar|  Balaji|     Kumar|           MSO |  1|  M|23-01-1994|81.33|  1234678|Connecticut |
|  8|   Naveen Shekrappa|  Naveen| Shekrappa|      Manulife |  2|  M|24-01-1994|  100|   789414|   Delaware |
|  9|      Milind Chavan|  Milind|    Chavan|        Retail |  3|  M|25-01-1994|83.66|   245555|    Florida |
| 10|       Raghu Rajeev|   Raghu|    Rajeev|       Banking |  4|  M|26-01-1994|87.65|   235468|     Georgia|
+---+-------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+

您只需要执行以下操作:

You'll just need to do the following :

scala> val res1 = source.join(destination, Seq("_c0"), "leftanti")

scala> val res2 = destination.join(source, Seq("_c0"), "leftanti")

这与我在答案中提到的逻辑

It's the same logic I mentioned in my answer here.

这篇关于如何在Spark的过滤条件中使用NOT IN子句的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆