在 Spark 数据集中滚动你自己的 reduceByKey [英] Rolling your own reduceByKey in Spark Dataset

查看:13
本文介绍了在 Spark 数据集中滚动你自己的 reduceByKey的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

除了 RDD 之外,我正在尝试学习更多地使用 DataFrames 和 DataSets.对于 RDD,我知道我可以执行 someRDD.reduceByKey((x,y) => x + y),但我看不到 Dataset 的那个函数.所以我决定写一个.

I'm trying to learn to use DataFrames and DataSets more in addition to RDDs. For an RDD, I know I can do someRDD.reduceByKey((x,y) => x + y), but I don't see that function for Dataset. So I decided to write one.

someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => {
  val result = mutable.HashMap.empty[(Long,Long),Int]
  val keys = mutable.HashSet.empty[(Long,Long)]
  y.keys.foreach(z => keys += z)
  x.keys.foreach(z => keys += z)
  for (elem <- keys) {
    val s1 = if(x.contains(elem)) x(elem) else 0
    val s2 = if(y.contains(elem)) y(elem) else 0
    result(elem) = s1 + s2
  }
  result
})

但是,这会将所有内容返回给驱动程序.您将如何编写此代码以返回 Dataset?也许 mapPartition 并在那里做?

However, this returns everything to the driver. How would you write this to return a Dataset? Maybe mapPartition and do it there?

请注意,它会编译但不会运行,因为它还没有 Map 的编码器

Note this compiles but does not run because it doesn't have encoders for Map yet

推荐答案

我假设你的目标是将这个成语翻译成数据集:

I assume your goal is to translate this idiom to Datasets:

rdd.map(x => (x.someKey, x.someField))
   .reduceByKey(_ + _)

// => returning an RDD of (KeyType, FieldType)

目前,我使用 Dataset API 找到的最接近的解决方案如下所示:

Currently, the closest solution I have found with the Dataset API looks like this:

ds.map(x => (x.someKey, x.someField))          // [1]
  .groupByKey(_._1)                            
  .reduceGroups((a, b) => (a._1, a._2 + b._2))
  .map(_._2)                                   // [2]

// => returning a Dataset of (KeyType, FieldType)

// Comments:
// [1] As far as I can see, having a map before groupByKey is required
//     to end up with the proper type in reduceGroups. After all, we do
//     not want to reduce over the original type, but the FieldType.
// [2] required since reduceGroups converts back to Dataset[(K, V)]
//     not knowing that our V's are already key-value pairs.

看起来不是很优雅,根据快速基准测试,它的性能也差很多,所以也许我们在这里遗漏了一些东西......

Doesn't look very elegant and according to a quick benchmark it is also much less performant, so maybe we are missing something here...

注意:另一种方法可能是使用 groupByKey(_.someKey) 作为第一步.问题是使用 groupByKey 将类型从常规 Dataset 更改为 KeyValueGroupedDataset.后者没有常规的 map 功能.相反,它提供了一个 mapGroups,这似乎不太方便,因为它将值包装到一个 Iterator 中并根据文档字符串执行 shuffle.

Note: An alternative might be to use groupByKey(_.someKey) as a first step. The problem is that using groupByKey changes the type from a regular Dataset to a KeyValueGroupedDataset. The latter does not have a regular map function. Instead it offers an mapGroups, which does not seem very convenient because it wraps the values into an Iterator and performs a shuffle according to the docstring.

这篇关于在 Spark 数据集中滚动你自己的 reduceByKey的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆