迭代code。与长期沿袭RDD导致Apache的星火计算器错误 [英] iterative code with long lineage RDD causes stackoverflow error in Apache Spark

查看:262
本文介绍了迭代code。与长期沿袭RDD导致Apache的星火计算器错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的Apache星火初学者。我目前工作的一个机器学习计划,这需要反复更新RDD然后从执行人收集了近10KB数据驱动程序。不幸的是,我得到一个计算器错误,当它运行在600次迭代!
以下是我的code。
在collectAsMap功能发生的错误计算器,当迭代次数超过400!
其中,indexedDevF和indexedData是indexedRDD(由AMPLab发展为库提供 https://github.com/amplab/spark -indexedrdd

 易碎{
  而(打击> bHigh + 2 *公差){
    indexedDevF = indexedDevF.innerJoin(indexedData){(ID,A,B)=> (B,A)} mapValues​​(X =方式>(x._2 + alphaHighDiff * broad_y.value(IHIGH)*内核(x._1,dataiHigh)+ alphaLowDiff * broad_y.value(ILOW)*内核(x._1 ,dataiLow)))
    如果(迭代%50 == 0){
          indexedDevF.checkpoint()
    }
    indexedDevF.persist()//必须得到正确的答案    VAL devFMap = indexedDevF.collectAsMap()//0.5s每次都根据当地:4040!这里将计算器    VAR MIN_VALUE = Double.PositiveInfinity
    VAR MAX_VALUE = -min_value
    VAR min_i = -1
    VAR max_i = -1    I = 0
    而(ⅰ&所述M){      如果(((Y(I)大于0)及及(α(I)所述; cEpsilon))||((Y(I)℃,)及及(α(I)GT;小量))){
          如果(devFMap(I)所述; = MIN_VALUE){
              MIN_VALUE = devFMap(I)
              min_i = I
          }
      }      如果(((Y(I)大于0)及及(α(I)GT;小量))||((Y(I)℃,)及及(α(I)所述; cEpsilon ))){
          如果(devFMap(I)GT = MAX_VALUE){
              MAX_VALUE = devFMap(I)
              max_i = I
          }
      }
      I = I + 1
    }    IHIGH = min_i
    ILOW = max_i
    bHigh = devFMap(IHIGH)
    一击= devFMap(ILOW)    dataiHigh = indexedData.get(iHigh.toLong)不用彷徨
    dataiLow = indexedData.get(iLow.toLong)不用彷徨    ETA = 2 - 2 *内核(dataiHigh,dataiLow)    alphaHighOld =α(IHIGH)
    alphaLowOld =α(ILOW)
    VAR alphaDiff = alphaLowOld - alphaHighOld
    VAR lowLabel = Y(ILOW)
    VAR标志= Y(IHIGH)* lowLabel    VAR alphaLowLowerBound = 0D
    VAR alphaLowUpperBound = 0D    如果(符号℃,){
        如果(alphaDiff℃,){
            alphaLowLowerBound = 0;
            alphaLowUpperBound =成本+ alphaDiff;
        }
        其他{
            alphaLowLowerBound = alphaDiff;
            alphaLowUpperBound =成本;
        }
    }
    其他{
        VAR alphaSum = alphaLowOld + alphaHighOld;
        如果(alphaSum<成本){
            alphaLowUpperBound = alphaSum;
            alphaLowLowerBound = 0;
        }
        其他{
            alphaLowLowerBound = alphaSum - 成本;
            alphaLowUpperBound =成本;
        }
    }    如果(ETA大于0){
        alphaLowNew = alphaLowOld + lowLabel *(bHigh - 吹)/ ETA;
        如果(alphaLowNew< alphaLowLowerBound)
            alphaLowNew = alphaLowLowerBound;
        否则,如果(alphaLowNew> alphaLowUpperBound)
            alphaLowNew = alphaLowUpperBound;
    }
    其他{
        VAR斜率= lowLabel *(bHigh - 吹);
        VAR三角洲=斜率*(alphaLowUpperBound - alphaLowLowerBound);
        如果(增量大于0){
            如果(斜率大于0)
                alphaLowNew = alphaLowUpperBound;
            其他
                alphaLowNew = alphaLowLowerBound;
        }
        其他
            alphaLowNew = alphaLowOld;
    }    alphaLowDiff = alphaLowNew - alphaLowOld;
    alphaHighDiff = -sign *(alphaLowDiff);
    阿尔法(ILOW)= alphaLowNew;
    阿尔法(IHIGH)=(alphaHighOld + alphaHighDiff);
    如果(迭代%50 == 0)
      打印(。)    迭代=迭代+ 1;
}

===================

原来的问题是下面,我发现检查点是无用的,该方案将与计算器errer结论!我写了一个简单的测试code来形容我的问题。幸运的是,一个好人帮我解决这个问题,你可以在下面找到答案!然而,即使是检查站还真管用,我仍然得到计算器错误与我的程序:(

 为(I<  -  1〜1000){
  一个= a.map(X => X + 1).persist
  变种B = a.collect()
  如果(I%100 == 0){
    a.checkpoint()
  }
  打印(。)
}


解决方案

看着 RDD.checkpoint 文件,它说:


  

任何工作就这一RDD执行之前此功能必须在名为


事实上,如果你改变了code咯,有检查点做的的收集 A - 它的工作原理没有的StackOverflowError

 为(I<  -  1〜1000){
  一个= a.map(X => X + 1).persist  如果(I%100 == 0){
    a.checkpoint()
  }  变种B = a.collect()  打印(。)
}

I am a beginner of Apache Spark. I am currently working on a Machine Learning program, which requires to iteratively update a RDD and then collect nearly 10KB data to driver from executors. Unfortunately, I get a StackOverFlow error when it runs over 600 iterations! The following is my code. The stackoverflow error happened at collectAsMap function when iteration number is over 400! where indexedDevF and indexedData are indexedRDD (developed by AMPLab as an library provided https://github.com/amplab/spark-indexedrdd)

breakable{
  while(bLow > bHigh + 2*tolerance){
    indexedDevF = indexedDevF.innerJoin(indexedData){(id, a, b) => (b, a)}.mapValues( x => ( x._2 + alphaHighDiff * broad_y.value(iHigh) * kernel(x._1, dataiHigh) + alphaLowDiff * broad_y.value(iLow) * kernel(x._1, dataiLow) ) )
    if (iteration % 50 == 0 ) {
          indexedDevF.checkpoint()
    }
    indexedDevF.persist()  // essential to get correct answer

    val devFMap = indexedDevF.collectAsMap() //0.5s every time according to local:4040! here will stackoverflow

    var min_value = Double.PositiveInfinity
    var max_value = -min_value
    var min_i = -1
    var max_i = -1

    i = 0
    while( i < m ){

      if(((y(i) > 0) && (alpha(i) < cEpsilon)) || ((y(i) < 0) && (alpha(i) > epsilon))){
          if( devFMap(i) <= min_value){
              min_value = devFMap(i)
              min_i = i
          }
      }

      if(((y(i) > 0) && (alpha(i) > epsilon)) || ((y(i) < 0) && (alpha(i) < cEpsilon))){
          if( devFMap(i) >= max_value ){
              max_value = devFMap(i)
              max_i = i
          }
      }
      i = i+1
    }

    iHigh = min_i
    iLow = max_i
    bHigh = devFMap(iHigh)
    bLow = devFMap(iLow) 

    dataiHigh = indexedData.get(iHigh.toLong).get
    dataiLow = indexedData.get(iLow.toLong).get 

    eta = 2 - 2 * kernel(dataiHigh, dataiLow)

    alphaHighOld = alpha(iHigh)
    alphaLowOld = alpha(iLow)
    var alphaDiff = alphaLowOld - alphaHighOld
    var lowLabel = y(iLow)
    var sign = y(iHigh) * lowLabel

    var alphaLowLowerBound = 0D
    var alphaLowUpperBound = 0D

    if (sign < 0){
        if (alphaDiff < 0){
            alphaLowLowerBound = 0;
            alphaLowUpperBound = cost + alphaDiff;
        }
        else{
            alphaLowLowerBound = alphaDiff;
            alphaLowUpperBound = cost;
        }
    }
    else{
        var alphaSum = alphaLowOld + alphaHighOld;
        if (alphaSum < cost){
            alphaLowUpperBound = alphaSum;
            alphaLowLowerBound = 0;
        }
        else{
            alphaLowLowerBound = alphaSum - cost;
            alphaLowUpperBound = cost;
        }
    }

    if (eta > 0){
        alphaLowNew = alphaLowOld + lowLabel*(bHigh - bLow)/eta;
        if (alphaLowNew < alphaLowLowerBound)
            alphaLowNew = alphaLowLowerBound;
        else if (alphaLowNew > alphaLowUpperBound) 
            alphaLowNew = alphaLowUpperBound;
    }
    else{
        var slope = lowLabel * (bHigh - bLow);
        var delta = slope * (alphaLowUpperBound - alphaLowLowerBound);
        if (delta > 0){
            if (slope > 0)  
                alphaLowNew = alphaLowUpperBound;
            else
                alphaLowNew = alphaLowLowerBound;
        }
        else
            alphaLowNew = alphaLowOld;
    }

    alphaLowDiff = alphaLowNew - alphaLowOld;
    alphaHighDiff = -sign*(alphaLowDiff);
    alpha(iLow) = alphaLowNew;
    alpha(iHigh) = (alphaHighOld + alphaHighDiff);


    if(iteration % 50 == 0)
      print(".")

    iteration = iteration + 1;


}

===================

The original question is as following, I find the checkpoint is useless and the program will conclude with stackoverflow errer!! I write a test simple code to describe my problem. Fortunately, a nice guy help me solve the problem, you can find the answer below! However, even the checkpoint really works, I still get stackoverflow error with my program :(

for(i <- 1 to 1000){
  a = a.map(x => x+1).persist
  var b = a.collect()
  if(i%100 == 0){
    a.checkpoint()
  }
  print(".")
}

解决方案

Looking at RDD.checkpoint documentation, it says:

This function must be called before any job has been executed on this RDD

And indeed, if you change your code slightly, to have the checkpoint done before collecting a - it works with no StackOverflowError:

for(i <- 1 to 1000){
  a = a.map(x => x+1).persist

  if(i%100 == 0){
    a.checkpoint()
  }

  var b = a.collect()

  print(".")
}

这篇关于迭代code。与长期沿袭RDD导致Apache的星火计算器错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆