如何使用Spark计算累计和 [英] How to compute cumulative sum using Spark

查看:431
本文介绍了如何使用Spark计算累计和的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有(String,Int)的rdd,按键排序

I have an rdd of (String,Int) which is sorted by key

val data = Array(("c1",6), ("c2",3),("c3",4))
val rdd = sc.parallelize(data).sortByKey

现在,我想以零开始第一个键的值,然后以先前键的总和作为后续键的值.

Now I want to start the value for the first key with zero and the subsequent keys as sum of the previous keys.

例如:c1 = 0,c2 = c1的值,c3 =(c1值+ c2值),c4 =(c1 + .. + c3值) 预期输出:

Eg: c1 = 0 , c2 = c1's value , c3 = (c1 value +c2 value) , c4 = (c1+..+c3 value) expected output:

(c1,0), (c2,6), (c3,9)...

有可能实现这一目标吗? 我在地图上尝试过,但是总和没有保存在地图内.

Is it possible to achieve this ? I tried it with map but the sum is not preserved inside the map.

var sum = 0 ;
val t = keycount.map{ x => { val temp = sum; sum = sum + x._2 ; (x._1,temp); }}

推荐答案

  1. 计算每个分区的部分结果:

  1. Compute partial results for each partition:

val partials = rdd.mapPartitionsWithIndex((i, iter) => {
  val (keys, values) = iter.toSeq.unzip
  val sums  = values.scanLeft(0)(_ + _)
  Iterator((keys.zip(sums.tail), sums.last))
})

  • 收集部分和

  • Collect partials sums

    val partialSums = partials.values.collect
    

  • 计算分区上的累积总和并进行广播:

  • Compute cumulative sum over partitions and broadcast it:

    val sumMap = sc.broadcast(
      (0 until rdd.partitions.size)
        .zip(partialSums.scanLeft(0)(_ + _))
        .toMap
    )
    

  • 计算最终结果:

  • Compute final results:

    val result = partials.keys.mapPartitionsWithIndex((i, iter) => {
      val offset = sumMap.value(i)
      if (iter.isEmpty) Iterator()
      else iter.next.map{case (k, v) => (k, v + offset)}.toIterator
    })
    

  • 这篇关于如何使用Spark计算累计和的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆