自联接无法按预期与 DataFrame API 一起使用 [英] Self-join not working as expected with the DataFrame API

查看:22
本文介绍了自联接无法按预期与 DataFrame API 一起使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用自联接从表中获取最新记录.它使用 spark-sql 工作,但不使用 spark DataFrame API 工作.

I am trying to get the latest records from a table using self join. It works using spark-sql but not working using spark DataFrame API.

有人可以帮忙吗?是bug吗?

Can anyone help? Is it a bug?

我在本地模式下使用 Spark 2.2.0

I am using Spark 2.2.0 in local mode

创建输入DataFrame:

scala> val df3 = spark.sparkContext.parallelize(Array((1,"a",1),(1,"aa",2),(2,"b",2),(2,"bb",5))).toDF("id","value","time")
df3: org.apache.spark.sql.DataFrame = [id: int, value: string ... 1 more field]    

scala> val df33 = df3
df33: org.apache.spark.sql.DataFrame = [id: int, value: string ... 1 more field]

scala> df3.show
+---+-----+----+
| id|value|time|
+---+-----+----+
|  1|    a|   1|
|  1|   aa|   2|
|  2|    b|   2|
|  2|   bb|   5|
+---+-----+----+

scala> df33.show
+---+-----+----+
| id|value|time|
+---+-----+----+
|  1|    a|   1|
|  1|   aa|   2|
|  2|    b|   2|
|  2|   bb|   5|
+---+-----+----+

现在使用 SQL 执行连接:有效

Now performing the join using SQL: works

scala> spark.sql("select df33.* from df3 join df33 on df3.id = df33.id and df3.time < df33.time").show
+---+-----+----+
| id|value|time|
+---+-----+----+
|  1|   aa|   2|
|  2|   bb|   5|
+---+-----+----+

现在使用数据帧 API 执行连接:不起作用

Now performing the join using dataframe API: doesn't work

scala> df3.join(df33, (df3.col("id") === df33.col("id")) && (df3.col("time") < df33.col("time")) ).select(df33.col("id"),df33.col("value"),df33.col("time")).show
+---+-----+----+
| id|value|time|
+---+-----+----+
+---+-----+----+

需要注意的是解释计划:空白用于DataFrame API!!

The thing to notice is the explain plans: blank for the DataFrame API!!

scala> df3.join(df33, (df3.col("id") === df33.col("id")) && (df3.col("time") < df33.col("time")) ).select(df33.col("id"),df33.col("value"),df33.col("time")).explain
== Physical Plan ==
LocalTableScan <empty>, [id#150, value#151, time#152]

scala> spark.sql("select df33.* from df3 join df33 on df3.id = df33.id and df3.time < df33.time").explain
== Physical Plan ==
*Project [id#1241, value#1242, time#1243]
+- *SortMergeJoin [id#150], [id#1241], Inner, (time#152 < time#1243)
   :- *Sort [id#150 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#150, 200)
   :     +- *Project [_1#146 AS id#150, _3#148 AS time#152]
   :        +- *SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true])._1 AS _1#146, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, scala.Tuple3, true])._2, true) AS _2#147, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#148]
   :           +- Scan ExternalRDDScan[obj#145]
   +- *Sort [id#1241 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#1241, 200)
         +- *Project [_1#146 AS id#1241, _2#147 AS value#1242, _3#148 AS time#1243]
            +- *SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true])._1 AS _1#146, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, scala.Tuple3, true])._2, true) AS _2#147, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#148]
               +- Scan ExternalRDDScan[obj#145]

推荐答案

不,这不是一个错误,但是当你像你所做的那样将 DataFrame 重新分配给一个新的时,它实际上复制了世系,但它不会重复数据.因此,您将在同一列上进行比较.

No that's not a bug, but when you reassign the DataFrame to a new one like what you have done, it actually copies the lineage but it doesn't duplicate the data. Thus you'll be comparing on the same column.

使用 spark.sql 略有不同,因为它实际上是在处理 DataFrames

Use spark.sql is slightly different because it's actually working on aliases of your DataFrames

所以使用 API 执行自联接的正确方法实际上是别名你的 DataFrame 如下:

So the correct way to perform a self-join using the API is actually aliasing your DataFrame as followed :

val df1 = Seq((1,"a",1),(1,"aa",2),(2,"b",2),(2,"bb",5)).toDF("id","value","time")

df1.as("df1").join(df1.as("df2"), $"df1.id" === $"df2.id" && $"df1.time" < $"df2.time").select($"df2.*").show
// +---+-----+----+
// | id|value|time|
// +---+-----+----+
// |  1|   aa|   2|
// |  2|   bb|   5|
// +---+-----+----+

有关自联接的更多信息,我建议阅读 Rachel Warren 的高性能 Spark,Holden Karau - 第 4 章.

For more information about self-joins, I recommend reading High Performance Spark by Rachel Warren, Holden Karau - Chapter 4.

这篇关于自联接无法按预期与 DataFrame API 一起使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆