Apache Spark:使用 RDD.aggregateByKey() 的 RDD.groupByKey() 的等效实现是什么? [英] Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?
问题描述
Apache Spark pyspark.RDD
API 文档提到 groupByKey()
效率低下.相反,建议使用 reduceByKey()
、aggregateByKey()
、combineByKey()
或 foldByKey()
> 代替.这将导致在 shuffle 之前在工作线程中进行一些聚合,从而减少跨工作线程的数据混洗.
The Apache Spark pyspark.RDD
API docs mention that groupByKey()
is inefficient. Instead, it is recommend to use reduceByKey()
, aggregateByKey()
, combineByKey()
, or foldByKey()
instead. This will result in doing some of the aggregation in the workers prior to the shuffle, thus reducing shuffling of data across workers.
给定以下数据集和 groupByKey()
表达式,什么是不使用 groupByKey()
的等效且有效的实现(减少跨工作器数据混洗),但提供相同的结果?
Given the following data set and groupByKey()
expression, what is an equivalent and efficient implementation (reduced cross-worker data shuffling) that does not utilize groupByKey()
, but delivers the same result?
dataset = [("a", 7), ("b", 3), ("a", 8)]
rdd = (sc.parallelize(dataset)
.groupByKey())
print sorted(rdd.mapValues(list).collect())
输出:
[('a', [7, 8]), ('b', [3])]
推荐答案
据我所知,在这种特殊情况下使用 aggregateByKey
或类似函数没有任何好处.由于您正在构建一个列表,因此没有真正的"减少,必须改组的数据量或多或少是相同的.
As far as I can tell there is nothing to gain* in this particular case by using aggregateByKey
or a similar function. Since you're building a list there is no "real" reduction and amount of data which has to be shuffled is more or less the same.
要真正观察到一些性能提升,您需要进行实际减少传输数据量的转换,例如计数、计算汇总统计数据、查找唯一元素.
To really observe some performance gain you need transformations which actually reduces amount of the transfered data for example counting, computing summary statistics, finding unique elements.
关于使用 reduceByKey()
、combineByKey()
或 foldByKey()
的不同好处,有一个重要的概念差异,它更容易看看你什么时候考虑使用 Scala API singatures.
Regarding differences benefits of using reduceByKey()
, combineByKey()
, or foldByKey()
there is an important conceptual difference which is easier to see when you consider Scala API singatures.
reduceByKey
和 foldByKey
都从 RDD[(K, V)]
映射到 RDD[(K, V)]
而第二个提供额外的零元素.
Both reduceByKey
and foldByKey
map from RDD[(K, V)]
to RDD[(K, V)]
while the second one provides additional zero element.
reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]
combineByKey
(没有aggregateByKey
,但都是同类型的转换)从RDD[(K, V)]
转换而来RDD[(K, C)]
:
combineByKey
(there is no aggregateByKey
, but it is the same type of transformation) transforms from RDD[(K, V)]
to RDD[(K, C)]
:
combineByKey[C](
createCombiner: (V) ⇒ C,
mergeValue: (C, V) ⇒ C,
mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]
回到你的例子,只有 combineByKey
(以及在 PySpark 中的 aggregateByKey
)确实适用,因为你是从 RDD[(String, Int)]
到 RDD[(String, List[Int])]
.
Going back to your example only combineByKey
(and in PySpark aggregateByKey
) is really applicable since you are transforming from RDD[(String, Int)]
to RDD[(String, List[Int])]
.
虽然在像 Python 这样的动态语言中,实际上可以使用 foldByKey
或 reduceByKey
来执行这样的操作,但它使代码的语义不清楚并引用@tim-peters 应该有一种——最好只有一种——显而易见的方法"[1].
While in a dynamic language like Python it is actually possible to perform such an operation using foldByKey
or reduceByKey
it makes semantics of the code unclear and to cite @tim-peters "There should be one-- and preferably only one --obvious way to do it" [1].
aggregateByKey
和 combineByKey
之间的区别与 reduceByKey
和 foldByKey
之间的区别几乎相同,因此对于列表这主要是品味问题:
Difference between aggregateByKey
and combineByKey
is pretty much the same as between reduceByKey
and foldByKey
so for a list it is mostly a matter of taste:
def merge_value(acc, x):
acc.append(x)
return acc
def merge_combiners(acc1, acc2):
acc1.extend(acc2)
return acc1
rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
.combineByKey(
lambda x: [x],
lambda u, v: u + [v],
lambda u1,u2: u1+u2))
实际上,您应该更喜欢 groupByKey
.与上面提供的简单实现相比,PySpark 实现明显更加优化.
In practice you should prefer groupByKey
though. PySpark implementation is significantly more optimized compared to naive implementation like the one provided above.
1.Peters, T. PEP 20 -- Python 之禅.(2004).在 https://www.python.org/dev/peps/pep-0020/
1.Peters, T. PEP 20 -- The Zen of Python. (2004). at https://www.python.org/dev/peps/pep-0020/
* 在实践中,这里实际上有很多地方可以松动,尤其是在使用 PySpark 时.groupByKey
的 Python 实现比简单的按键组合要优化得多.您可以查看 注意groupByKey,由我和 @eliasah 创建,用于进一步讨论.
* In practice there is actually quite a lot to loose here, especially when using PySpark. Python implementation of groupByKey
is significantly more optimized than naive combine by key. You can check Be Smart About groupByKey, created by me and @eliasah for an additional discussion.
这篇关于Apache Spark:使用 RDD.aggregateByKey() 的 RDD.groupByKey() 的等效实现是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!