DataFrame/Dataset groupBy 行为/优化 [英] DataFrame / Dataset groupBy behaviour/optimization

查看:25
本文介绍了DataFrame/Dataset groupBy 行为/优化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我们有 DataFrame df 由以下列组成:

Suppose we have DataFrame df consisting of the following columns:

姓名、姓氏、大小、宽度、长度、重量

Name, Surname, Size, Width, Length, Weigh

现在我们要执行几个操作,例如我们要创建几个包含有关大小和宽度数据的 DataFrame.

Now we want to perform a couple of operations, for example we want to create a couple of DataFrames containing data about Size and Width.

val df1 = df.groupBy("surname").agg( sum("size") )
val df2 = df.groupBy("surname").agg( sum("width") )

如您所见,其他列(如 Length)并未在任何地方使用.Spark 是否足够聪明,可以在改组阶段之前删除多余的列,还是将它们随身携带?威尔跑:

as you can notice, other columns, like Length are not used anywhere. Is Spark smart enough to drop the redundant columns before the shuffling phase or are they carried around? Wil running:

val dfBasic = df.select("surname", "size", "width")

之前分组对性能有什么影响?

before grouping somehow affect the performance?

推荐答案

是的,它足够智能".在 DataFrame 上执行的 groupBy 与在普通 RDD 上执行的 groupBy 操作不同.在您描述的场景中,根本不需要移动原始数据.让我们创建一个小例子来说明:

Yes, it is "smart enough". groupBy performed on a DataFrame is not the same operation as groupBy performed on a plain RDD. In a scenario you've described there is no need to move raw data at all. Let's create a small example to illustrate that:

val df = sc.parallelize(Seq(
   ("a", "foo", 1), ("a", "foo", 3), ("b", "bar", 5), ("b", "bar", 1)
)).toDF("x", "y", "z")

df.groupBy("x").agg(sum($"z")).explain

// == Physical Plan ==
// *HashAggregate(keys=[x#148], functions=[sum(cast(z#150 as bigint))])
// +- Exchange hashpartitioning(x#148, 200)
//    +- *HashAggregate(keys=[x#148], functions=[partial_sum(cast(z#150 as bigint))])
//       +- *Project [_1#144 AS x#148, _3#146 AS z#150]
//          +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#144, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._2, true, false) AS _2#145, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#146]
//             +- Scan ExternalRDDScan[obj#143]

正如您所知,第一阶段是仅保留所需列的投影.接下来的数据在本地聚合,最后在全球范围内传输和聚合.如果您使用 Spark <= 1.4,您会得到一些不同的答案输出,但一般结构应该完全相同.

As you can the first phase is a projection where only required columns are preserved. Next data is aggregated locally and finally transferred and aggregated globally. You'll get a little bit different answer output if you use Spark <= 1.4 but general structure should be exactly the same.

最后一个 DAG 可视化显示上述描述描述了实际工作:

Finally a DAG visualization showing that above description describes actual job:

类似地,Dataset.groupByKey 后跟 reduceGroups,包含两个映射端(ObjectHashAggregatepartial_reduceaggregator)和减少端(ObjectHashAggregatereduceaggregator 减少):

Similarly, Dataset.groupByKey followed by reduceGroups, contains both map-side (ObjectHashAggregate with partial_reduceaggregator) and reduce-side (ObjectHashAggregate with reduceaggregator reduction):

case class Foo(x: String, y: String, z: Int)

val ds = df.as[Foo]
ds.groupByKey(_.x).reduceGroups((x, y) => x.copy(z = x.z + y.z)).explain

// == Physical Plan ==
// ObjectHashAggregate(keys=[value#126], functions=[reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@54d90261, Some(newInstance(class $line40.$read$$iw$$iw$Foo)), Some(class $line40.$read$$iw$$iw$Foo), Some(StructType(StructField(x,StringType,true), StructField(y,StringType,true), StructField(z,IntegerType,false))), input[0, scala.Tuple2, true]._1 AS value#128, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) null else named_struct(x, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).x, true, false) AS x#25, y, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).y, true, false) AS y#26, z, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).z AS z#27) AS _2#129, newInstance(class scala.Tuple2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).x, true, false) AS x#25, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).y, true, false) AS y#26, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).z AS z#27, StructField(x,StringType,true), StructField(y,StringType,true), StructField(z,IntegerType,false), true, 0, 0)])
// +- Exchange hashpartitioning(value#126, 200)
//    +- ObjectHashAggregate(keys=[value#126], functions=[partial_reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@54d90261, Some(newInstance(class $line40.$read$$iw$$iw$Foo)), Some(class $line40.$read$$iw$$iw$Foo), Some(StructType(StructField(x,StringType,true), StructField(y,StringType,true), StructField(z,IntegerType,false))), input[0, scala.Tuple2, true]._1 AS value#128, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) null else named_struct(x, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).x, true, false) AS x#25, y, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).y, true, false) AS y#26, z, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).z AS z#27) AS _2#129, newInstance(class scala.Tuple2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).x, true, false) AS x#25, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).y, true, false) AS y#26, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).z AS z#27, StructField(x,StringType,true), StructField(y,StringType,true), StructField(z,IntegerType,false), true, 0, 0)])
//       +- AppendColumns <function1>, newInstance(class $line40.$read$$iw$$iw$Foo), [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#126]
//          +- *Project [_1#4 AS x#8, _2#5 AS y#9, _3#6 AS z#10]
//             +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._2, true, false) AS _2#5, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#6]
//                +- Scan ExternalRDDScan[obj#3]

然而,KeyValueGroupedDataset 的其他方法可能与 RDD.groupByKey 类似.例如 mapGroups(或 flatMapGroups)不使用部分聚合.

However other methods of KeyValueGroupedDataset might work similarly to RDD.groupByKey. For example mapGroups (or flatMapGroups) doesn't use partial aggregation.

ds.groupByKey(_.x)
  .mapGroups((_, iter) => iter.reduce((x, y) => x.copy(z = x.z + y.z)))
  .explain

//== Physical Plan ==
//*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, $line15.$read$$iw$$iw$Foo, true]).x, true, false) AS x#37, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, $line15.$read$$iw$$iw$Foo, true]).y, true, false) AS y#38, assertnotnull(input[0, $line15.$read$$iw$$iw$Foo, true]).z AS z#39]
//+- MapGroups <function2>, value#32.toString, newInstance(class $line15.$read$$iw$$iw$Foo), [value#32], [x#8, y#9, z#10], obj#36: $line15.$read$$iw$$iw$Foo
//   +- *Sort [value#32 ASC NULLS FIRST], false, 0
//      +- Exchange hashpartitioning(value#32, 200)
//         +- AppendColumns <function1>, newInstance(class $line15.$read$$iw$$iw$Foo), [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#32]
//            +- *Project [_1#4 AS x#8, _2#5 AS y#9, _3#6 AS z#10]
//               +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._2, true, false) AS _2#5, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#6]
//                  +- Scan ExternalRDDScan[obj#3]

这篇关于DataFrame/Dataset groupBy 行为/优化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆