dropDuplicates运算符使用哪一行? [英] What row is used in dropDuplicates operator?
问题描述
在Spark DF中使用dropDuplicates
功能时,将保留哪一行? Spark文档中未对此进行说明.
Which row will be kept when one uses the dropDuplicates
function in Spark DF? It is not stated in the Spark documentation.
- 保持第一(根据行顺序)
- 保留最后一个(根据行顺序)
- 随机吗?
P.S.假设在分布式YARN环境(不是本地主环境)中
P.S. assuming in a distributed YARN environment (not master local)
推荐答案
TL; DR 保持第一(根据行顺序)
TL;DR Keep First (according to row order)
dropDuplicates
运算符.scala#L2148"rel =" noreferrer>使用Deduplicate
运算符创建逻辑计划.
dropDuplicates
operator in Spark SQL creates a logical plan with Deduplicate
operator.
That Deduplicate
operator is translated to First
logical operator by Spark SQL's Catalyst Optimizer which answers your question nicely (!)
您可以在下面的逻辑计划中看到Deduplicate
运算符.
You can see the Deduplicate
operator in the logical plan below.
// create datasets with duplicates
val dups = spark.range(9).map(_ % 3)
val q = dups.dropDuplicates
以下是q
数据集的逻辑计划.
The following is the logical plan of q
dataset.
scala> println(q.queryExecution.logical.numberedTreeString)
00 Deduplicate [value#64L], false
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
然后将
Deduplicate
运算符转换为First
逻辑运算符(优化后显示为Aggregate
运算符).
Deduplicate
operator is then translated to First
logical operator (that shows itself as Aggregate
operator after optimizations).
scala> println(q.queryExecution.optimizedPlan.numberedTreeString)
00 Aggregate [value#64L], [value#64L]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
花一些时间检查Apache Spark的代码后,dropDuplicates
运算符等效于groupBy
,后跟
After spending some time reviewing the code of Apache Spark, dropDuplicates
operator is equivalent to groupBy
followed by first function.
第一(columnName:字符串,ignoreNulls:布尔值):列聚合函数:返回组中列的第一个值.
first(columnName: String, ignoreNulls: Boolean): Column Aggregate function: returns the first value of a column in a group.
import org.apache.spark.sql.functions.first
val firsts = dups.groupBy("value").agg(first("value") as "value")
scala> println(firsts.queryExecution.logical.numberedTreeString)
00 'Aggregate [value#64L], [value#64L, first('value, false) AS value#139]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
scala> firsts.explain
== Physical Plan ==
*HashAggregate(keys=[value#64L], functions=[first(value#64L, false)])
+- Exchange hashpartitioning(value#64L, 200)
+- *HashAggregate(keys=[value#64L], functions=[partial_first(value#64L, false)])
+- *SerializeFromObject [input[0, bigint, false] AS value#64L]
+- *MapElements <function1>, obj#63: bigint
+- *DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
+- *Range (0, 9, step=1, splits=8)
我还认为 dropDuplicates
运算符可能更高效.
I also think that dropDuplicates
operator may be more performant.
这篇关于dropDuplicates运算符使用哪一行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!