spark RDD的fold方法说明 [英] Explanation of fold method of spark RDD

查看:40
本文介绍了spark RDD的fold方法说明的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在运行为 Hadoop-2.4 预先构建的 Spark-1.4.0(在本地模式下)来计算 DoubleRDD 的平方和.我的 Scala 代码看起来像

I am running Spark-1.4.0 pre-built for Hadoop-2.4 (in local mode) to calculate the sum of squares of a DoubleRDD. My Scala code looks like

sc.parallelize(Array(2., 3.)).fold(0.0)((p, v) => p+v*v)

它给出了一个令人惊讶的结果97.0.

And it gave a surprising result 97.0.

fold

Array(2., 3.).fold(0.0)((p, v) => p+v*v)

它给出了预期的答案 13.0.

which gives the expected answer 13.0.

由于缺乏理解,我似乎很可能在代码中犯了一些棘手的错误.我已经阅读了 RDD.fold() 中使用的函数应该如何通信,否则结果可能取决于分区等.例如,如果我将分区数更改为 1,

It seems quite likely that I have made some tricky mistakes in the code due to a lack of understanding. I have read about how the function used in RDD.fold() should be communicative otherwise the result may depend on partitions and etc. So example, if I change the number of partitions to 1,

sc.parallelize(Array(2., 3.), 1).fold(0.0)((p, v) => p+v*v)

代码会给我 169.0 在我的机器上!

the code will give me 169.0 on my machine!

有人能解释一下这里到底发生了什么吗?

Can someone explain what exactly is happening here?

推荐答案

嗯,实际上 官方文档:

使用给定的关联和交换函数以及中性的零值"聚合每个分区的元素,然后聚合所有分区的结果.函数 op(t1, t2) 允许修改 t1 并将其作为结果值返回以避免对象分配;但是,它不应修改 t2.

Aggregate the elements of each partition, and then the results for all the partitions, using a given associative and commutative function and a neutral "zero value". The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.

这与在 Scala 等函数式语言中为非分布式集合实现的折叠操作有些不同.这种折叠操作可以单独应用于分区,然后将这些结果折叠成最终结果,而不是按照某些定义的顺序依次将折叠应用于每个元素.对于不可交换的函数,结果可能与应用于非分布式集合的折叠的结果不同.

This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually, and then fold those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection.

为了说明正在发生的事情,让我们尝试逐步模拟正在发生的事情:

To illustrate what is going on lets try to simulate what is going on step by step:

val rdd = sc.parallelize(Array(2., 3.))

val byPartition = rdd.mapPartitions(
    iter => Array(iter.fold(0.0)((p, v) => (p +  v * v))).toIterator).collect()

它给了我们类似的东西 Array[Double] = Array(0.0, 0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 9.0) and

It gives us something similar to this Array[Double] = Array(0.0, 0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 9.0) and

byPartition.reduce((p, v) => (p + v * v))

返回 97

需要注意的重要一点是,根据分区组合的顺序,结果可能因运行而异.

Important thing to note is that results can differ from run to run depending on an order in which partitions are combined.

这篇关于spark RDD的fold方法说明的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆