Apache Spark:使用 RDD.aggregateByKey() 的 RDD.groupByKey() 的等效实现是什么? [英] Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?

查看:27
本文介绍了Apache Spark:使用 RDD.aggregateByKey() 的 RDD.groupByKey() 的等效实现是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Apache Spark pyspark.RDD API 文档提到 groupByKey() 效率低下.相反,建议使用 reduceByKey()aggregateByKey()combineByKey()foldByKey()> 代替.这将导致在 shuffle 之前在工作线程中进行一些聚合,从而减少跨工作线程的数据混洗.

The Apache Spark pyspark.RDD API docs mention that groupByKey() is inefficient. Instead, it is recommend to use reduceByKey(), aggregateByKey(), combineByKey(), or foldByKey() instead. This will result in doing some of the aggregation in the workers prior to the shuffle, thus reducing shuffling of data across workers.

给定以下数据集和 groupByKey() 表达式,什么是不使用 groupByKey() 的等效且有效的实现(减少跨工作器数据混洗),但提供相同的结果?

Given the following data set and groupByKey() expression, what is an equivalent and efficient implementation (reduced cross-worker data shuffling) that does not utilize groupByKey(), but delivers the same result?

dataset = [("a", 7), ("b", 3), ("a", 8)]
rdd = (sc.parallelize(dataset)
       .groupByKey())
print sorted(rdd.mapValues(list).collect())

输出:

[('a', [7, 8]), ('b', [3])]

推荐答案

据我所知,在这种特殊情况下使用 aggregateByKey 或类似函数没有任何好处.由于您正在构建一个列表,因此没有真正的"减少,必须改组的数据量或多或少是相同的.

As far as I can tell there is nothing to gain* in this particular case by using aggregateByKey or a similar function. Since you're building a list there is no "real" reduction and amount of data which has to be shuffled is more or less the same.

要真正观察到一些性能提升,您需要进行实际减少传输数据量的转换,例如计数、计算汇总统计数据、查找唯一元素.

To really observe some performance gain you need transformations which actually reduces amount of the transfered data for example counting, computing summary statistics, finding unique elements.

关于使用 reduceByKey()combineByKey()foldByKey() 的不同好处,有一个重要的概念差异,它更容易看看你什么时候考虑使用 Scala API singatures.

Regarding differences benefits of using reduceByKey(), combineByKey(), or foldByKey() there is an important conceptual difference which is easier to see when you consider Scala API singatures.

reduceByKeyfoldByKey 都从 RDD[(K, V)] 映射到 RDD[(K, V)] 而第二个提供额外的零元素.

Both reduceByKey and foldByKey map from RDD[(K, V)] to RDD[(K, V)] while the second one provides additional zero element.

reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)] 
foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]

combineByKey(没有aggregateByKey,但都是同类型的转换)从RDD[(K, V)]转换而来RDD[(K, C)]:

combineByKey (there is no aggregateByKey, but it is the same type of transformation) transforms from RDD[(K, V)] to RDD[(K, C)]:

combineByKey[C](
   createCombiner: (V) ⇒ C,
   mergeValue: (C, V) ⇒ C,
   mergeCombiners: (C, C) ⇒ C): RDD[(K, C)] 

回到你的例子,只有 combineByKey(以及在 PySpark 中的 aggregateByKey)确实适用,因为你是从 RDD[(String, Int)]RDD[(String, List[Int])].

Going back to your example only combineByKey (and in PySpark aggregateByKey) is really applicable since you are transforming from RDD[(String, Int)] to RDD[(String, List[Int])].

虽然在像 Python 这样的动态语言中,实际上可以使用 foldByKeyreduceByKey 来执行这样的操作,但它使代码的语义不清楚并引用@tim-peters 应该有一种——最好只有一种——显而易见的方法"[1].

While in a dynamic language like Python it is actually possible to perform such an operation using foldByKey or reduceByKey it makes semantics of the code unclear and to cite @tim-peters "There should be one-- and preferably only one --obvious way to do it" [1].

aggregateByKeycombineByKey 之间的区别与 reduceByKeyfoldByKey 之间的区别几乎相同,因此对于列表这主要是品味问题:

Difference between aggregateByKey and combineByKey is pretty much the same as between reduceByKey and foldByKey so for a list it is mostly a matter of taste:

def merge_value(acc, x):
    acc.append(x)
    return acc

def merge_combiners(acc1, acc2):
    acc1.extend(acc2)
    return acc1

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
   .combineByKey(
       lambda x: [x],
       lambda u, v: u + [v],
       lambda u1,u2: u1+u2))

实际上,您应该更喜欢 groupByKey.与上面提供的简单实现相比,PySpark 实现明显更加优化.

In practice you should prefer groupByKey though. PySpark implementation is significantly more optimized compared to naive implementation like the one provided above.

1.Peters, T. PEP 20 -- Python 之禅.(2004).在 https://www.python.org/dev/peps/pep-0020/

1.Peters, T. PEP 20 -- The Zen of Python. (2004). at https://www.python.org/dev/peps/pep-0020/

* 在实践中,这里实际上有很多地方可以松动,尤其是在使用 PySpark 时.groupByKey 的 Python 实现比简单的按键组合要优化得多.您可以查看 注意groupByKey,由我和 @eliasah 创建,用于进一步讨论.

* In practice there is actually quite a lot to loose here, especially when using PySpark. Python implementation of groupByKey is significantly more optimized than naive combine by key. You can check Be Smart About groupByKey, created by me and @eliasah for an additional discussion.

这篇关于Apache Spark:使用 RDD.aggregateByKey() 的 RDD.groupByKey() 的等效实现是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆