在Spark数据集中滚动自己的reduceByKey [英] Rolling your own reduceByKey in Spark Dataset
问题描述
除了RDD,我正在尝试学习更多地使用DataFrame和DataSet.对于RDD,我知道我可以执行someRDD.reduceByKey((x,y) => x + y)
,但没有为Dataset看到该功能.所以我决定写一个.
I'm trying to learn to use DataFrames and DataSets more in addition to RDDs. For an RDD, I know I can do someRDD.reduceByKey((x,y) => x + y)
, but I don't see that function for Dataset. So I decided to write one.
someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => {
val result = mutable.HashMap.empty[(Long,Long),Int]
val keys = mutable.HashSet.empty[(Long,Long)]
y.keys.foreach(z => keys += z)
x.keys.foreach(z => keys += z)
for (elem <- keys) {
val s1 = if(x.contains(elem)) x(elem) else 0
val s2 = if(y.contains(elem)) y(elem) else 0
result(elem) = s1 + s2
}
result
})
但是,这会将所有内容返回给驱动程序.您将如何编写此代码以返回Dataset
?也许mapPartition在那里吗?
However, this returns everything to the driver. How would you write this to return a Dataset
? Maybe mapPartition and do it there?
请注意,此代码可以编译但无法运行,因为它还没有Map
的编码器.
Note this compiles but does not run because it doesn't have encoders for Map
yet
推荐答案
我假设您的目标是将这种习语转换为数据集:
I assume your goal is to translate this idiom to Datasets:
rdd.map(x => (x.someKey, x.someField))
.reduceByKey(_ + _)
// => returning an RDD of (KeyType, FieldType)
当前,我通过数据集API找到的最接近的解决方案如下所示:
Currently, the closest solution I have found with the Dataset API looks like this:
ds.map(x => (x.someKey, x.someField)) // [1]
.groupByKey(_._1)
.reduceGroups((a, b) => (a._1, a._2 + b._2))
.map(_._2) // [2]
// => returning a Dataset of (KeyType, FieldType)
// Comments:
// [1] As far as I can see, having a map before groupByKey is required
// to end up with the proper type in reduceGroups. After all, we do
// not want to reduce over the original type, but the FieldType.
// [2] required since reduceGroups converts back to Dataset[(K, V)]
// not knowing that our V's are already key-value pairs.
看上去并不优雅,根据快速基准测试,它的性能也差很多,所以也许我们在这里缺少了一些东西...
Doesn't look very elegant and according to a quick benchmark it is also much less performant, so maybe we are missing something here...
注意:替代方法可能是使用groupByKey(_.someKey)
作为第一步.问题是使用groupByKey
会将类型从常规Dataset
更改为
Note: An alternative might be to use groupByKey(_.someKey)
as a first step. The problem is that using groupByKey
changes the type from a regular Dataset
to a KeyValueGroupedDataset
. The latter does not have a regular map
function. Instead it offers an mapGroups
, which does not seem very convenient because it wraps the values into an Iterator
and performs a shuffle according to the docstring.
这篇关于在Spark数据集中滚动自己的reduceByKey的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!