迭代code。与长期沿袭RDD导致Apache的星火计算器错误 [英] iterative code with long lineage RDD causes stackoverflow error in Apache Spark
问题描述
我的Apache星火初学者。我目前工作的一个机器学习计划,这需要反复更新RDD然后从执行人收集了近10KB数据驱动程序。不幸的是,我得到一个计算器错误,当它运行在600次迭代!
以下是我的code。
在collectAsMap功能发生的错误计算器,当迭代次数超过400!
其中,indexedDevF和indexedData是indexedRDD(由AMPLab发展为库提供 https://github.com/amplab/spark -indexedrdd )
易碎{
而(打击> bHigh + 2 *公差){
indexedDevF = indexedDevF.innerJoin(indexedData){(ID,A,B)=> (B,A)} mapValues(X =方式>(x._2 + alphaHighDiff * broad_y.value(IHIGH)*内核(x._1,dataiHigh)+ alphaLowDiff * broad_y.value(ILOW)*内核(x._1 ,dataiLow)))
如果(迭代%50 == 0){
indexedDevF.checkpoint()
}
indexedDevF.persist()//必须得到正确的答案 VAL devFMap = indexedDevF.collectAsMap()//0.5s每次都根据当地:4040!这里将计算器 VAR MIN_VALUE = Double.PositiveInfinity
VAR MAX_VALUE = -min_value
VAR min_i = -1
VAR max_i = -1 I = 0
而(ⅰ&所述M){ 如果(((Y(I)大于0)及及(α(I)所述; cEpsilon))||((Y(I)℃,)及及(α(I)GT;小量))){
如果(devFMap(I)所述; = MIN_VALUE){
MIN_VALUE = devFMap(I)
min_i = I
}
} 如果(((Y(I)大于0)及及(α(I)GT;小量))||((Y(I)℃,)及及(α(I)所述; cEpsilon ))){
如果(devFMap(I)GT = MAX_VALUE){
MAX_VALUE = devFMap(I)
max_i = I
}
}
I = I + 1
} IHIGH = min_i
ILOW = max_i
bHigh = devFMap(IHIGH)
一击= devFMap(ILOW) dataiHigh = indexedData.get(iHigh.toLong)不用彷徨
dataiLow = indexedData.get(iLow.toLong)不用彷徨 ETA = 2 - 2 *内核(dataiHigh,dataiLow) alphaHighOld =α(IHIGH)
alphaLowOld =α(ILOW)
VAR alphaDiff = alphaLowOld - alphaHighOld
VAR lowLabel = Y(ILOW)
VAR标志= Y(IHIGH)* lowLabel VAR alphaLowLowerBound = 0D
VAR alphaLowUpperBound = 0D 如果(符号℃,){
如果(alphaDiff℃,){
alphaLowLowerBound = 0;
alphaLowUpperBound =成本+ alphaDiff;
}
其他{
alphaLowLowerBound = alphaDiff;
alphaLowUpperBound =成本;
}
}
其他{
VAR alphaSum = alphaLowOld + alphaHighOld;
如果(alphaSum<成本){
alphaLowUpperBound = alphaSum;
alphaLowLowerBound = 0;
}
其他{
alphaLowLowerBound = alphaSum - 成本;
alphaLowUpperBound =成本;
}
} 如果(ETA大于0){
alphaLowNew = alphaLowOld + lowLabel *(bHigh - 吹)/ ETA;
如果(alphaLowNew< alphaLowLowerBound)
alphaLowNew = alphaLowLowerBound;
否则,如果(alphaLowNew> alphaLowUpperBound)
alphaLowNew = alphaLowUpperBound;
}
其他{
VAR斜率= lowLabel *(bHigh - 吹);
VAR三角洲=斜率*(alphaLowUpperBound - alphaLowLowerBound);
如果(增量大于0){
如果(斜率大于0)
alphaLowNew = alphaLowUpperBound;
其他
alphaLowNew = alphaLowLowerBound;
}
其他
alphaLowNew = alphaLowOld;
} alphaLowDiff = alphaLowNew - alphaLowOld;
alphaHighDiff = -sign *(alphaLowDiff);
阿尔法(ILOW)= alphaLowNew;
阿尔法(IHIGH)=(alphaHighOld + alphaHighDiff);
如果(迭代%50 == 0)
打印(。) 迭代=迭代+ 1;
}
===================
原来的问题是下面,我发现检查点是无用的,该方案将与计算器errer结论!我写了一个简单的测试code来形容我的问题。幸运的是,一个好人帮我解决这个问题,你可以在下面找到答案!然而,即使是检查站还真管用,我仍然得到计算器错误与我的程序:(
为(I< - 1〜1000){
一个= a.map(X => X + 1).persist
变种B = a.collect()
如果(I%100 == 0){
a.checkpoint()
}
打印(。)
}
看着 RDD.checkpoint
文件,它说:
任何工作就这一RDD执行之前此功能必须在名为
块引用>事实上,如果你改变了code咯,有检查点做的前的收集
A
- 它的工作原理没有的StackOverflowError
:为(I< - 1〜1000){
一个= a.map(X => X + 1).persist 如果(I%100 == 0){
a.checkpoint()
} 变种B = a.collect() 打印(。)
}I am a beginner of Apache Spark. I am currently working on a Machine Learning program, which requires to iteratively update a RDD and then collect nearly 10KB data to driver from executors. Unfortunately, I get a StackOverFlow error when it runs over 600 iterations! The following is my code. The stackoverflow error happened at collectAsMap function when iteration number is over 400! where indexedDevF and indexedData are indexedRDD (developed by AMPLab as an library provided https://github.com/amplab/spark-indexedrdd)
breakable{ while(bLow > bHigh + 2*tolerance){ indexedDevF = indexedDevF.innerJoin(indexedData){(id, a, b) => (b, a)}.mapValues( x => ( x._2 + alphaHighDiff * broad_y.value(iHigh) * kernel(x._1, dataiHigh) + alphaLowDiff * broad_y.value(iLow) * kernel(x._1, dataiLow) ) ) if (iteration % 50 == 0 ) { indexedDevF.checkpoint() } indexedDevF.persist() // essential to get correct answer val devFMap = indexedDevF.collectAsMap() //0.5s every time according to local:4040! here will stackoverflow var min_value = Double.PositiveInfinity var max_value = -min_value var min_i = -1 var max_i = -1 i = 0 while( i < m ){ if(((y(i) > 0) && (alpha(i) < cEpsilon)) || ((y(i) < 0) && (alpha(i) > epsilon))){ if( devFMap(i) <= min_value){ min_value = devFMap(i) min_i = i } } if(((y(i) > 0) && (alpha(i) > epsilon)) || ((y(i) < 0) && (alpha(i) < cEpsilon))){ if( devFMap(i) >= max_value ){ max_value = devFMap(i) max_i = i } } i = i+1 } iHigh = min_i iLow = max_i bHigh = devFMap(iHigh) bLow = devFMap(iLow) dataiHigh = indexedData.get(iHigh.toLong).get dataiLow = indexedData.get(iLow.toLong).get eta = 2 - 2 * kernel(dataiHigh, dataiLow) alphaHighOld = alpha(iHigh) alphaLowOld = alpha(iLow) var alphaDiff = alphaLowOld - alphaHighOld var lowLabel = y(iLow) var sign = y(iHigh) * lowLabel var alphaLowLowerBound = 0D var alphaLowUpperBound = 0D if (sign < 0){ if (alphaDiff < 0){ alphaLowLowerBound = 0; alphaLowUpperBound = cost + alphaDiff; } else{ alphaLowLowerBound = alphaDiff; alphaLowUpperBound = cost; } } else{ var alphaSum = alphaLowOld + alphaHighOld; if (alphaSum < cost){ alphaLowUpperBound = alphaSum; alphaLowLowerBound = 0; } else{ alphaLowLowerBound = alphaSum - cost; alphaLowUpperBound = cost; } } if (eta > 0){ alphaLowNew = alphaLowOld + lowLabel*(bHigh - bLow)/eta; if (alphaLowNew < alphaLowLowerBound) alphaLowNew = alphaLowLowerBound; else if (alphaLowNew > alphaLowUpperBound) alphaLowNew = alphaLowUpperBound; } else{ var slope = lowLabel * (bHigh - bLow); var delta = slope * (alphaLowUpperBound - alphaLowLowerBound); if (delta > 0){ if (slope > 0) alphaLowNew = alphaLowUpperBound; else alphaLowNew = alphaLowLowerBound; } else alphaLowNew = alphaLowOld; } alphaLowDiff = alphaLowNew - alphaLowOld; alphaHighDiff = -sign*(alphaLowDiff); alpha(iLow) = alphaLowNew; alpha(iHigh) = (alphaHighOld + alphaHighDiff); if(iteration % 50 == 0) print(".") iteration = iteration + 1; }
===================
The original question is as following, I find the checkpoint is useless and the program will conclude with stackoverflow errer!! I write a test simple code to describe my problem. Fortunately, a nice guy help me solve the problem, you can find the answer below! However, even the checkpoint really works, I still get stackoverflow error with my program :(
for(i <- 1 to 1000){ a = a.map(x => x+1).persist var b = a.collect() if(i%100 == 0){ a.checkpoint() } print(".") }
解决方案Looking at
RDD.checkpoint
documentation, it says:This function must be called before any job has been executed on this RDD
And indeed, if you change your code slightly, to have the checkpoint done before collecting
a
- it works with noStackOverflowError
:for(i <- 1 to 1000){ a = a.map(x => x+1).persist if(i%100 == 0){ a.checkpoint() } var b = a.collect() print(".") }
这篇关于迭代code。与长期沿袭RDD导致Apache的星火计算器错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!