dropDuplicates运算符使用哪一行? [英] What row is used in dropDuplicates operator?

查看:58
本文介绍了dropDuplicates运算符使用哪一行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在Spark DF中使用dropDuplicates功能时,将保留哪一行? Spark文档中未对此进行说明.

Which row will be kept when one uses the dropDuplicatesfunction in Spark DF? It is not stated in the Spark documentation.

  1. 保持第一(根据行顺序)
  2. 保留最后一个(根据行顺序)
  3. 随机吗?

P.S.假设在分布式YARN环境(不是本地主环境)中

P.S. assuming in a distributed YARN environment (not master local)

推荐答案

TL; DR 保持第一(根据行顺序)

TL;DR Keep First (according to row order)

dropDuplicates运算符.scala#L2148"rel =" noreferrer>使用Deduplicate运算符创建逻辑计划.

dropDuplicates operator in Spark SQL creates a logical plan with Deduplicate operator.

Deduplicate运算符

That Deduplicate operator is translated to First logical operator by Spark SQL's Catalyst Optimizer which answers your question nicely (!)

您可以在下面的逻辑计划中看到Deduplicate运算符.

You can see the Deduplicate operator in the logical plan below.

// create datasets with duplicates
val dups = spark.range(9).map(_ % 3)

val q = dups.dropDuplicates

以下是q数据集的逻辑计划.

The following is the logical plan of q dataset.

scala> println(q.queryExecution.logical.numberedTreeString)
00 Deduplicate [value#64L], false
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02    +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03       +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04          +- Range (0, 9, step=1, splits=Some(8))

然后将

Deduplicate运算符转换为First逻辑运算符(优化后显示为Aggregate运算符).

Deduplicate operator is then translated to First logical operator (that shows itself as Aggregate operator after optimizations).

scala> println(q.queryExecution.optimizedPlan.numberedTreeString)
00 Aggregate [value#64L], [value#64L]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02    +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03       +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
04          +- Range (0, 9, step=1, splits=Some(8))


花一些时间检查Apache Spark的代码后,dropDuplicates运算符等效于groupBy,后跟


After spending some time reviewing the code of Apache Spark, dropDuplicates operator is equivalent to groupBy followed by first function.

第一(columnName:字符串,ignoreNulls:布尔值):列聚合函数:返回组中列的第一个值.

first(columnName: String, ignoreNulls: Boolean): Column Aggregate function: returns the first value of a column in a group.

import org.apache.spark.sql.functions.first
val firsts = dups.groupBy("value").agg(first("value") as "value")
scala> println(firsts.queryExecution.logical.numberedTreeString)
00 'Aggregate [value#64L], [value#64L, first('value, false) AS value#139]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02    +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03       +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04          +- Range (0, 9, step=1, splits=Some(8))

scala> firsts.explain
== Physical Plan ==
*HashAggregate(keys=[value#64L], functions=[first(value#64L, false)])
+- Exchange hashpartitioning(value#64L, 200)
   +- *HashAggregate(keys=[value#64L], functions=[partial_first(value#64L, false)])
      +- *SerializeFromObject [input[0, bigint, false] AS value#64L]
         +- *MapElements <function1>, obj#63: bigint
            +- *DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
               +- *Range (0, 9, step=1, splits=8)

我还认为 dropDuplicates运算符可能更高效.

I also think that dropDuplicates operator may be more performant.

这篇关于dropDuplicates运算符使用哪一行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆