Spark 2.0数据集groupByKey并对操作和类型安全进行划分 [英] Spark 2.0 DataSets groupByKey and divide operation and type safety
问题描述
我对Spark 2.0数据集感到非常满意,因为它具有编译时类型安全性.但是这里有几个我无法解决的问题,我也没有为此找到好的文档.
I am very much pleased with Spark 2.0 DataSets because of it's compile time type safety. But here is couple of problem that I am not able to work out, I also didn't find good documentation for this.
问题1-对汇总列进行除法运算- 考虑下面的代码- 我有一个DataSet [MyCaseClass],我想在c1,c2,c3和sum(c4)/8上使用groupByKey.下面的代码可以很好地工作,如果我只是计算总和,但它会给出除法(8)的编译时错误.我不知道如何实现关注.
Problem #1 - divide operation on aggregated column- Consider below code - I have a DataSet[MyCaseClass] and I wanted to groupByKey on c1,c2,c3 and sum(c4) / 8. The below code works well if I just calculate the sum but it gives compile time error for divide(8). I wonder how I can achieve following.
final case class MyClass (c1: String,
c2: String,
c3: String,
c4: Double)
val myCaseClass: DataSet[MyCaseClass] = ??? // assume it's being loaded
import sparkSession.implicits._
import org.apache.spark.sql.expressions.scalalang.typed.{sum => typedSum}
myCaseClass.
groupByKey(myCaseClass =>
(myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
agg(typedSum[MyCaseClass](_.c4).name("sum(c4)").
divide(8)). //this is breaking with exception
show()
如果我删除.divide(8)操作并在命令上方运行,它将在输出下方显示.
If I remove .divide(8) operation and run above command it gives me below output.
+-----------+-------------+
| key|sum(c4) |
+-----------+-------------+
| [A1,F2,S1]| 80.0|
| [A1,F1,S1]| 40.0|
+-----------+-------------+
问题2-将groupedByKey结果转换为另一个Typed DataFrame- 现在,我的问题的第二部分是我要再次输出类型化的DataSet.为此,我有另一个案例类(不确定是否需要),但不确定如何与分组结果映射-
Problem #2 - converting groupedByKey result to another Typed DataFrame - Now second part of my problem is I want output again a typed DataSet. For that I have another case class (not sure if it is needed) but I am not sure how to map with grouped result -
final case class AnotherClass(c1: String,
c2: String,
c3: String,
average: Double)
myCaseClass.
groupByKey(myCaseClass =>
(myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
agg(typedSum[MyCaseClass](_.c4).name("sum(c4)")).
as[AnotherClass] //this is breaking with exception
但这又失败了,但有一个例外,因为按关键结果分组的结果没有直接与AnotherClass映射.
but this again fails with an exception as grouped by key result is not directly mapped with AnotherClass.
PS:实现上述目标的任何其他解决方案都非常受欢迎.
PS : any other solution to achieve above is more than welcome.
推荐答案
第一个问题可以通过完全使用类型化的列来解决(KeyValueGroupedDataset.agg
期望TypedColumn(-s)
)
您可以将聚合结果定义为:
The first problem can be resolved by using typed columns all the way down (KeyValueGroupedDataset.agg
expects TypedColumn(-s)
)
You can defined aggregation result as:
val eight = lit(8.0)
.as[Double] // Not necessary
val sumByEight = typedSum[MyClass](_.c4)
.divide(eight)
.as[Double] // Required
.name("div(sum(c4), 8)")
并将其插入以下代码:
val myCaseClass = Seq(
MyClass("a", "b", "c", 2.0),
MyClass("a", "b", "c", 3.0)
).toDS
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.agg(sumByEight)
获得
+-------+---------------+
| key|div(sum(c4), 8)|
+-------+---------------+
|[a,b,c]| 0.625|
+-------+---------------+
第二个问题是使用不符合数据形状的类的结果.正确的表示形式可能是:
The second problem is a result of using a class which doesn't conform to a data shape. A correct representation could be:
case class AnotherClass(key: (String, String, String), sum: Double)
用于上面定义的数据:
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.agg(typedSum[MyClass](_.c4).name("sum"))
.as[AnotherClass]
将给出:
+-------+---+
| key|sum|
+-------+---+
|[a,b,c]|5.0|
+-------+---+
但是如果Dataset[((String, String, String), Double)]
是可以接受的,则在这里不需要.as[AnotherClass]
.
but .as[AnotherClass]
is not necessary here if Dataset[((String, String, String), Double)]
is acceptable.
您当然可以跳过所有内容,而只需mapGroups
(尽管并非不影响性能):
You can of course skip all of that and just mapGroups
(although not without performance penalty):
import shapeless.syntax.std.tuple._ // A little bit of shapeless
val tuples = myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.mapGroups((group, iter) => group :+ iter.map(_.c4).sum)
有结果
+---+---+---+---+
| _1| _2| _3| _4|
+---+---+---+---+
| a| b| c|5.0|
+---+---+---+---+
reduceGroups
可能是一个更好的选择:
reduceGroups
could be a better option:
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.reduceGroups((x, y) => x.copy(c4=x.c4 + y.c4))
,结果为Dataset
:
+-------+-----------+
| _1| _2|
+-------+-----------+
|[a,b,c]|[a,b,c,5.0]|
+-------+-----------+
这篇关于Spark 2.0数据集groupByKey并对操作和类型安全进行划分的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!