为什么在 Spark 中需要折叠动作? [英] Why is the fold action necessary in Spark?

查看:35
本文介绍了为什么在 Spark 中需要折叠动作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个愚蠢的问题,涉及foldPySpark 中的reduce.我理解这两种方法之间的区别,但是,如果两者都需要应用的函数是一个可交换的幺半群,我无法找出一个例子,其中 fold 不能被reduce`替代.

I've a silly question involving fold and reduce in PySpark. I understand the difference between these two methods, but, if both need that the applied function is a commutative monoid, I cannot figure out an example in which fold cannot be substituted byreduce`.

另外,在fold的PySpark实现中使用了acc = op(obj, acc),为什么使用这个操作顺序而不是acc =op(acc, obj)?(这第二个顺序对我来说听起来更接近 leftFold)

Besides, in the PySpark implementation of fold it is used acc = op(obj, acc), why this operation order is used instead of acc = op(acc, obj)? (this second order sounds more closed to a leftFold to me)

干杯

托马斯

推荐答案

Empty RDD

RDD为空时不能被替换:

val rdd = sc.emptyRDD[Int]
rdd.reduce(_ + _)
// java.lang.UnsupportedOperationException: empty collection at   
// org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$ ...

rdd.fold(0)(_ + _)
// Int = 0

您当然可以将 reduceisEmpty 上的条件结合起来,但它相当难看.

You can of course combine reduce with condition on isEmpty but it is rather ugly.

可变缓冲区

fold 的另一个用例是使用可变缓冲区进行聚合.考虑以下 RDD:

Another use case for fold is aggregation with mutable buffer. Consider following RDD:

import breeze.linalg.DenseVector

val rdd = sc.parallelize(Array.fill(100)(DenseVector(1)), 8)

假设我们想要所有元素的总和.一个天真的解决方案是简单地用 + 减少:

Lets say we want a sum of all elements. A naive solution is to simply reduce with +:

rdd.reduce(_ + _)

不幸的是,它为每个元素创建了一个新向量.由于对象创建和随后的垃圾收集很昂贵,因此使用可变对象可能会更好.reduce 是不可能的(RDD 的不变性并不意味着元素的不变性),但是可以通过 fold 实现,如下所示:

Unfortunately it creates a new vector for each element. Since object creation and subsequent garbage collection is expensive it could be better to use a mutable object. It is not possible with reduce (immutability of RDD doesn't imply immutability of the elements), but can be achieved with fold as follows:

rdd.fold(DenseVector(0))((acc, x) => acc += x)

这里使用零元素作为可变缓冲区,每个分区初始化一次,不影响实际数据.

Zero element is used here as mutable buffer initialized once per partition leaving actual data untouched.

acc = op(obj, acc),为什么用这个操作顺序而不是acc = op(acc, obj)

acc = op(obj, acc), why this operation order is used instead of acc = op(acc, obj)

参见 SPARK-6416SPARK-7683

这篇关于为什么在 Spark 中需要折叠动作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆