如何使用Spark计算累计和 [英] How to compute cumulative sum using Spark
问题描述
我有(String,Int)的rdd,按键排序
I have an rdd of (String,Int) which is sorted by key
val data = Array(("c1",6), ("c2",3),("c3",4))
val rdd = sc.parallelize(data).sortByKey
现在,我想以零开始第一个键的值,然后以先前键的总和作为后续键的值.
Now I want to start the value for the first key with zero and the subsequent keys as sum of the previous keys.
例如:c1 = 0,c2 = c1的值,c3 =(c1值+ c2值),c4 =(c1 + .. + c3值) 预期输出:
Eg: c1 = 0 , c2 = c1's value , c3 = (c1 value +c2 value) , c4 = (c1+..+c3 value) expected output:
(c1,0), (c2,6), (c3,9)...
有可能实现这一目标吗? 我在地图上尝试过,但是总和没有保存在地图内.
Is it possible to achieve this ? I tried it with map but the sum is not preserved inside the map.
var sum = 0 ;
val t = keycount.map{ x => { val temp = sum; sum = sum + x._2 ; (x._1,temp); }}
推荐答案
-
计算每个分区的部分结果:
Compute partial results for each partition:
val partials = rdd.mapPartitionsWithIndex((i, iter) => {
val (keys, values) = iter.toSeq.unzip
val sums = values.scanLeft(0)(_ + _)
Iterator((keys.zip(sums.tail), sums.last))
})
收集部分和
Collect partials sums
val partialSums = partials.values.collect
计算分区上的累积总和并进行广播:
Compute cumulative sum over partitions and broadcast it:
val sumMap = sc.broadcast(
(0 until rdd.partitions.size)
.zip(partialSums.scanLeft(0)(_ + _))
.toMap
)
计算最终结果:
Compute final results:
val result = partials.keys.mapPartitionsWithIndex((i, iter) => {
val offset = sumMap.value(i)
if (iter.isEmpty) Iterator()
else iter.next.map{case (k, v) => (k, v + offset)}.toIterator
})
这篇关于如何使用Spark计算累计和的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!