Spark 2.0 DataSets groupByKey 和除法操作和类型安全 [英] Spark 2.0 DataSets groupByKey and divide operation and type safety

查看:59
本文介绍了Spark 2.0 DataSets groupByKey 和除法操作和类型安全的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对 Spark 2.0 DataSet 非常满意,因为它的编译时类型安全.但是这里有几个我无法解决的问题,我也没有为此找到好的文档.

I am very much pleased with Spark 2.0 DataSets because of it's compile time type safety. But here is couple of problem that I am not able to work out, I also didn't find good documentation for this.

问题 #1 - 聚合列的除法操作-考虑下面的代码 -我有一个 DataSet[MyCaseClass],我想在 c1、c2、c3 和 sum(c4)/8 上使用 groupByKey.如果我只计算总和,下面的代码运行良好,但它给出了除法 (8) 的编译时间错误.我想知道如何实现以下目标.

Problem #1 - divide operation on aggregated column- Consider below code - I have a DataSet[MyCaseClass] and I wanted to groupByKey on c1,c2,c3 and sum(c4) / 8. The below code works well if I just calculate the sum but it gives compile time error for divide(8). I wonder how I can achieve following.

final case class MyClass (c1: String,
                          c2: String,
                          c3: String,
                          c4: Double)

    val myCaseClass: DataSet[MyCaseClass] = ??? // assume it's being loaded

    import sparkSession.implicits._
    import org.apache.spark.sql.expressions.scalalang.typed.{sum => typedSum}

     myCaseClass.
       groupByKey(myCaseClass =>
          (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
          agg(typedSum[MyCaseClass](_.c4).name("sum(c4)").
          divide(8)). //this is breaking with exception
       show()

如果我删除 .divide(8) 操作并运行上面的命令,它会给出下面的输出.

If I remove .divide(8) operation and run above command it gives me below output.

+-----------+-------------+
|        key|sum(c4)      |
+-----------+-------------+
| [A1,F2,S1]|         80.0|
| [A1,F1,S1]|         40.0|  
+-----------+-------------+

问题 #2 - 将 groupedByKey 结果转换为另一个 Typed DataFrame -现在我的问题的第二部分是我想再次输出一个类型化的数据集.为此,我有另一个案例类(不确定是否需要),但我不确定如何使用分组结果进行映射 -

Problem #2 - converting groupedByKey result to another Typed DataFrame - Now second part of my problem is I want output again a typed DataSet. For that I have another case class (not sure if it is needed) but I am not sure how to map with grouped result -

final case class AnotherClass(c1: String,
                          c2: String,
                          c3: String,
                          average: Double) 

 myCaseClass.
           groupByKey(myCaseClass =>
              (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
              agg(typedSum[MyCaseClass](_.c4).name("sum(c4)")).
as[AnotherClass] //this is breaking with exception

但是这再次失败,出现一个异常,因为按关键结果分组没有直接映射到另一个类.

but this again fails with an exception as grouped by key result is not directly mapped with AnotherClass.

PS:欢迎使用任何其他解决方案来实现上述目标.

PS : any other solution to achieve above is more than welcome.

推荐答案

第一个问题可以通过一直向下使用类型化列来解决(KeyValueGroupedDataset.agg expects TypedColumn(-s))您可以将聚合结果定义为:

The first problem can be resolved by using typed columns all the way down (KeyValueGroupedDataset.agg expects TypedColumn(-s)) You can defined aggregation result as:

val eight = lit(8.0)
  .as[Double]  // Not necessary

val sumByEight = typedSum[MyClass](_.c4)
  .divide(eight)
  .as[Double]  // Required
  .name("div(sum(c4), 8)")

并将其插入以下代码:

val myCaseClass = Seq(
  MyClass("a", "b", "c", 2.0),
  MyClass("a", "b", "c", 3.0)
).toDS

myCaseClass
  .groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
  .agg(sumByEight)

得到

+-------+---------------+
|    key|div(sum(c4), 8)|
+-------+---------------+
|[a,b,c]|          0.625|
+-------+---------------+

第二个问题是使用不符合数据形状的类的结果.正确的表示可能是:

The second problem is a result of using a class which doesn't conform to a data shape. A correct representation could be:

case class AnotherClass(key: (String, String, String), sum: Double)

与上面定义的数据一起使用:

which used with data defined above:

 myCaseClass
   .groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
   .agg(typedSum[MyClass](_.c4).name("sum"))
   .as[AnotherClass]

会给:

+-------+---+
|    key|sum|
+-------+---+
|[a,b,c]|5.0|
+-------+---+

但是如果 Dataset[((String, String, String), Double)] 是可以接受的,这里就不需要 .as[AnotherClass].

but .as[AnotherClass] is not necessary here if Dataset[((String, String, String), Double)] is acceptable.

你当然可以跳过所有这些,只需要 mapGroups(虽然不是没有性能损失):

You can of course skip all of that and just mapGroups (although not without performance penalty):

import shapeless.syntax.std.tuple._   // A little bit of shapeless

val tuples = myCaseClass
 .groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
 .mapGroups((group, iter) => group :+ iter.map(_.c4).sum)

结果

+---+---+---+---+   
| _1| _2| _3| _4|
+---+---+---+---+
|  a|  b|  c|5.0|
+---+---+---+---+

reduceGroups 可能是更好的选择:

myCaseClass
  .groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
  .reduceGroups((x, y) => x.copy(c4=x.c4 + y.c4))

带有结果Dataset:

+-------+-----------+    
|     _1|         _2|
+-------+-----------+
|[a,b,c]|[a,b,c,5.0]|
+-------+-----------+

这篇关于Spark 2.0 DataSets groupByKey 和除法操作和类型安全的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆