创建RDD收集迭代计算的结果 [英] Creating an RDD to collect the results of an iterative calculation

查看:207
本文介绍了创建RDD收集迭代计算的结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想创建一个RDD收集迭代计算的结果。

I would like to create an RDD to collect the results of an iterative calculation .

我如何使用循环(或任何其他),以替换以下code:

How can I use a loop (or any alternative) to replace the following code:

import org.apache.spark.mllib.random.RandomRDDs._    

val n = 10 

val step1 = normalRDD(sc, n, seed = 1 ) 
val step2 = normalRDD(sc, n, seed = (step1.max).toLong ) 
val result1 = step1.zip(step2) 
val step3 = normalRDD(sc, n, seed = (step2.max).toLong ) 
val result2 = result1.zip(step3) 

...

val step50 = normalRDD(sc, n, seed = (step49.max).toLong ) 
val result49 = result48.zip(step50) 

(创建N个步骤RDDS和结束时一起然后压缩和解也将是只要在50 RDDS创建迭代尊重种子=(步骤确定(N-1)的.max)条件)

(creating the N step RDDs and zipping then together at the end would also be ok as long the 50 RDDs are created iteratively to respect the seed = (step(n-1).max) condition)

推荐答案

一个递归函数将工作:

/**
 * The return type is an Option to handle the case of a user specifying
 * a non positive number of steps.
 */
def createZippedNormal(sc : SparkContext, 
                       numPartitions : Int, 
                       numSteps : Int) : Option[RDD[Double]] = {

  @scala.annotation.tailrec   
  def accum(sc : SparkContext, 
            numPartitions : Int, 
            numSteps : Int, 
            currRDD : RDD[Double], 
            seed : Long) : RDD[Double] = {
    if(numSteps <= 0) currRDD
    else {
      val newRDD = normalRDD(sc, numPartitions, seed) 
      accum(sc, numPartitions, numSteps - 1, currRDD.zip(newRDD), newRDD.max)
    }
  }

  if(numSteps <= 0) None
  else Some(accum(sc, numPartitions, numSteps, sc.emptyRDD[Double], 1L))
}

这篇关于创建RDD收集迭代计算的结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆