阿帕奇星火:什么是使用RDD.aggregateByKey RDD.groupByKey()相当于()实现? [英] Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?

查看:2248
本文介绍了阿帕奇星火:什么是使用RDD.aggregateByKey RDD.groupByKey()相当于()实现?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

阿帕奇星火 pyspark.RDD API文档提到 groupByKey()是低效的。相反,它是推荐使用 reduceByKey() aggregateByKey() combineByKey() foldByKey()来代替。这将导致对改组之前做一些在工人的聚集,从而在整个工减少数据混洗。

考虑下面的数据集

groupByKey()前pression,什么是等效的,高效的实现(减少交叉工人的数据移动)不利用 groupByKey(),但提供相同的结果呢?

数据= [(A,7),(B,3),(A,8 )]
RDD =(sc.parallelize(数据集)
       .groupByKey())
打印排序(rdd.mapValues​​(名单).collect())

输出:

  [('A',[7,8]),(B,[3])]


解决方案

据我可以告诉有什么用在这种特殊情况下获得 aggregateByKey 或类似的功能。既然你建立一个列表不存在真正的减量化,并且具有要改组的或多或少相同的数据量。

要真正观察你需要变换一些性能增益,实际上减少量例如计数的转移数据,计算汇总统计,发现独特的元素。

关于使用不同的好处 reduceByKey() combineByKey() foldByKey ()有一个重要的概念上的差异是比较容易看到,当你考虑的Scala API singatures。

两者 reduceByKey foldByKey RDD图[(K,V)] RDD [(K,V)] ,而第二个提供了额外的零元。

  reduceByKey(FUNC:(V,V)⇒V):RDD [(K,V)]
foldByKey(零值:V)(FUNC:(V,V)⇒Ⅴ):RDD [(K,V)]

combineByKey (没有 aggregateByKey ,但它同类型的转换)的转换 RDD [(K,V)] RDD [(K,C)]

  combineByKey [C](
   createCombiner:(V)⇒C,
   mergeValue:(C,V)⇒C,
   mergeCombiners:(C,C)⇒C):RDD [(K,C)

让我们回到你的例子仅 combineByKey (在PySpark aggregateByKey )确实是适用的,因为你是从转变 RDD [(字符串,整数)] RDD [(字符串,列表[INT])]

而在像Python这样的动态语言它实际上是可以用来执行这样的操作 foldByKey reduceByKey 它使得code语义不清,举@蒂姆 - 彼得斯的应该有one--和preferably只有一个--obvious办法做到这一点的[1]。

区别 aggregateByKey combineByKey 是pretty大致相同之间reduceByKey foldByKey 等了列表它主要是一个品味的问题:

  RDD =(sc.parallelize([(一,7),(B,3),(一,8)])
   .combineByKey(波长X:[X]
                   拉姆达U,V:U + [V]
                   拉姆达U1,U2:U1 + U2))

和添加一些可读性 toolz

 从toolz进口利弊,concatvRDD =(sc.parallelize([(一,7),(B,3),(一,8)])
     .combineByKey(波长X:[X],利弊,concatv))

由于 combineByKey / aggregateByKey 执行map端减少它可以创建大量的临时对象并作为结果可能是更昂贵相比, groupByKey 。参见:

1.Peters,T PEP 20 - Python的禅宗。 (2004)。在 https://www.python.org/dev/peps/pep-0020/

The Apache Spark pyspark.RDD API docs mention that groupByKey() is inefficient. Instead, it is recommend to use reduceByKey(), aggregateByKey(), combineByKey(), or foldByKey() instead. This will result in doing some of the aggregation in the workers prior to the shuffle, thus reducing shuffling of data across workers.

Given the following data set and groupByKey() expression, what is an equivalent and efficient implementation (reduced cross-worker data shuffling) that does not utilize groupByKey(), but delivers the same result?

dataset = [("a", 7), ("b", 3), ("a", 8)]
rdd = (sc.parallelize(dataset)
       .groupByKey())
print sorted(rdd.mapValues(list).collect())

Output:

[('a', [7, 8]), ('b', [3])]

解决方案

As far as I can tell there is nothing to gain in this particular case by using aggregateByKey or a similar function. Since you're building a list there is no "real" reduction and amount of data which has to be shuffled is more or less the same.

To really observe some performance gain you need transformations which actually reduces amount of the transfered data for example counting, computing summary statistics, finding unique elements.

Regarding differences benefits of using reduceByKey(), combineByKey(), or foldByKey() there is an important conceptual difference which is easier to see when you consider Scala API singatures.

Both reduceByKey and foldByKey map from RDD[(K, V)] to RDD[(K, V)] while the second one provides additional zero element.

reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)] 
foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]

combineByKey (there is no aggregateByKey, but it is the same type of transformation) transforms from RDD[(K, V)] to RDD[(K, C)]:

combineByKey[C](
   createCombiner: (V) ⇒ C,
   mergeValue: (C, V) ⇒ C,
   mergeCombiners: (C, C) ⇒ C): RDD[(K, C)] 

Going back to your example only combineByKey (and in PySpark aggregateByKey) is really applicable since you are transforming from RDD[(String, Int)] to RDD[(String, List[Int])].

While in a dynamic language like Python it is actually possible to perform such an operation using foldByKey or reduceByKey it makes semantics of the code unclear and to cite @tim-peters "There should be one-- and preferably only one --obvious way to do it" [1].

Difference between aggregateByKey and combineByKey is pretty much the same as between reduceByKey and foldByKey so for a list it is mostly a matter of taste:

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
   .combineByKey(lambda x: [x],
                   lambda u, v: u+[v],
                   lambda u1,u2: u1+u2))

And to add some readability a little bit of toolz:

from toolz import cons, concatv

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
     .combineByKey(lambda x: [x], cons, concatv))

Since combineByKey / aggregateByKey performs map-side reduction it can create a large amount of temporary objects and as a result can be more expensive compared to groupByKey. See:

1.Peters, T. PEP 20 -- The Zen of Python. (2004). at https://www.python.org/dev/peps/pep-0020/

这篇关于阿帕奇星火:什么是使用RDD.aggregateByKey RDD.groupByKey()相当于()实现?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆