自联接无法与DataFrame API一起正常工作 [英] Self-join not working as expected with the DataFrame API
问题描述
我正在尝试使用自连接从表中获取最新记录.它可以使用spark-sql
起作用,但不能使用spark DataFrame
API起作用.
I am trying to get the latest records from a table using self join. It works using spark-sql
but not working using spark DataFrame
API.
任何人都可以帮忙吗?是虫子吗?
Can anyone help? Is it a bug?
我在本地模式下使用Spark 2.2.0
I am using Spark 2.2.0 in local mode
创建输入DataFrame
:
scala> val df3 = spark.sparkContext.parallelize(Array((1,"a",1),(1,"aa",2),(2,"b",2),(2,"bb",5))).toDF("id","value","time")
df3: org.apache.spark.sql.DataFrame = [id: int, value: string ... 1 more field]
scala> val df33 = df3
df33: org.apache.spark.sql.DataFrame = [id: int, value: string ... 1 more field]
scala> df3.show
+---+-----+----+
| id|value|time|
+---+-----+----+
| 1| a| 1|
| 1| aa| 2|
| 2| b| 2|
| 2| bb| 5|
+---+-----+----+
scala> df33.show
+---+-----+----+
| id|value|time|
+---+-----+----+
| 1| a| 1|
| 1| aa| 2|
| 2| b| 2|
| 2| bb| 5|
+---+-----+----+
现在使用SQL执行联接:有效
Now performing the join using SQL: works
scala> spark.sql("select df33.* from df3 join df33 on df3.id = df33.id and df3.time < df33.time").show
+---+-----+----+
| id|value|time|
+---+-----+----+
| 1| aa| 2|
| 2| bb| 5|
+---+-----+----+
现在使用数据框API执行联接:不起作用
Now performing the join using dataframe API: doesn't work
scala> df3.join(df33, (df3.col("id") === df33.col("id")) && (df3.col("time") < df33.col("time")) ).select(df33.col("id"),df33.col("value"),df33.col("time")).show
+---+-----+----+
| id|value|time|
+---+-----+----+
+---+-----+----+
需要注意的是解释计划: DataFrame
API的空白!
The thing to notice is the explain plans: blank for the DataFrame
API!!
scala> df3.join(df33, (df3.col("id") === df33.col("id")) && (df3.col("time") < df33.col("time")) ).select(df33.col("id"),df33.col("value"),df33.col("time")).explain
== Physical Plan ==
LocalTableScan <empty>, [id#150, value#151, time#152]
scala> spark.sql("select df33.* from df3 join df33 on df3.id = df33.id and df3.time < df33.time").explain
== Physical Plan ==
*Project [id#1241, value#1242, time#1243]
+- *SortMergeJoin [id#150], [id#1241], Inner, (time#152 < time#1243)
:- *Sort [id#150 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#150, 200)
: +- *Project [_1#146 AS id#150, _3#148 AS time#152]
: +- *SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true])._1 AS _1#146, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, scala.Tuple3, true])._2, true) AS _2#147, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#148]
: +- Scan ExternalRDDScan[obj#145]
+- *Sort [id#1241 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#1241, 200)
+- *Project [_1#146 AS id#1241, _2#147 AS value#1242, _3#148 AS time#1243]
+- *SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true])._1 AS _1#146, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, scala.Tuple3, true])._2, true) AS _2#147, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#148]
+- Scan ExternalRDDScan[obj#145]
推荐答案
不是,这不是bug,但是当您将DataFrame重新分配给您执行的新操作时,它实际上复制了沿袭,但没有重复数据.因此,您将在同一列上进行比较.
No that's not a bug, but when you reassign the DataFrame to a new one like what you have done, it actually copies the lineage but it doesn't duplicate the data. Thus you'll be comparing on the same column.
使用spark.sql
稍有不同,因为它实际上是对DataFrame
s
Use spark.sql
is slightly different because it's actually working on aliases of your DataFrame
s
因此,使用API执行自联接的正确方法实际上是别名 您的DataFrame
,如下所示:
So the correct way to perform a self-join using the API is actually aliasing your DataFrame
as followed :
val df1 = Seq((1,"a",1),(1,"aa",2),(2,"b",2),(2,"bb",5)).toDF("id","value","time")
df1.as("df1").join(df1.as("df2"), $"df1.id" === $"df2.id" && $"df1.time" < $"df2.time").select($"df2.*").show
// +---+-----+----+
// | id|value|time|
// +---+-----+----+
// | 1| aa| 2|
// | 2| bb| 5|
// +---+-----+----+
有关自我联接的更多信息,建议阅读雷切尔·沃伦(Rachel Warren),霍尔顿·卡劳(Holden Karau)的《高性能火花》-第4章.
For more information about self-joins, I recommend reading High Performance Spark by Rachel Warren, Holden Karau - Chapter 4.
这篇关于自联接无法与DataFrame API一起正常工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!