计算器由于长期RDD天堂 [英] Stackoverflow due to long RDD Lineage
问题描述
我有成千上万的小文件在HDFS。需要处理文件的稍小的子集(这又是在千)的fileList包含需要被处理的文件路径的列表。
//的fileList ==在HDFS文件路径列表VAR masterRDD:org.apache.spark.rdd.RDD [(字符串,字符串)] = sparkContext.emptyRDD为(ⅰ&下; - 0到fileList.size() - 1){VAL文件路径= fileStatus.get(I)
VAL fileRDD = sparkContext.textFile(文件路径)
VAL sampleRDD = fileRDD.filter(行=> line.startsWith(#####))的地图。(行=>(文件路径,线))masterRDD = masterRDD.union(sampleRDD)}masterRDD.first()
//一旦离开德路,表演在计算器错误的任何行动结果由于RDD的长期沿袭
异常线程mainjava.lang.StackOverflowError
在scala.runtime.AbstractFunction1&下;初始化>(AbstractFunction1.scala:12)。
在org.apache.spark.rdd.UnionRDD $$ anonfun $ 1<&初始化GT;(UnionRDD.scala:66)
在org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
在org.apache.spark.rdd.RDD $$ anonfun $ $分区2.适用(RDD.scala:239)
在org.apache.spark.rdd.RDD $$ anonfun $ $分区2.适用(RDD.scala:237)
在scala.Option.getOrElse(Option.scala:120)
在org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
在org.apache.spark.rdd.UnionRDD $$ anonfun $ 1.适用(UnionRDD.scala:66)
在org.apache.spark.rdd.UnionRDD $$ anonfun $ 1.适用(UnionRDD.scala:66)
在scala.collection.TraversableLike $$ anonfun $ $图1.适用(TraversableLike.scala:244)
在scala.collection.TraversableLike $$ anonfun $ $图1.适用(TraversableLike.scala:244)
在scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)
在scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
在scala.collection.TraversableLike $ class.map(TraversableLike.scala:244)
在scala.collection.AbstractTraversable.map(Traversable.scala:105)
在org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
在org.apache.spark.rdd.RDD $$ anonfun $ $分区2.适用(RDD.scala:239)
在org.apache.spark.rdd.RDD $$ anonfun $ $分区2.适用(RDD.scala:237)
在scala.Option.getOrElse(Option.scala:120)
在org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
在org.apache.spark.rdd.UnionRDD $$ anonfun $ 1.适用(UnionRDD.scala:66)
在org.apache.spark.rdd.UnionRDD $$ anonfun $ 1.适用(UnionRDD.scala:66)
在scala.collection.TraversableLike $$ anonfun $ $图1.适用(TraversableLike.scala:244)
在scala.collection.TraversableLike $$ anonfun $ $图1.适用(TraversableLike.scala:244)
在scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)
在scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
在scala.collection.TraversableLike $ class.map(TraversableLike.scala:244)
在scala.collection.AbstractTraversable.map(Traversable.scala:105)
在org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
在org.apache.spark.rdd.RDD $$ anonfun $ $分区2.适用(RDD.scala:239)
在org.apache.spark.rdd.RDD $$ anonfun $ $分区2.适用(RDD.scala:237)
在scala.Option.getOrElse(Option.scala:120)
在org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
在org.apache.spark.rdd.UnionRDD $$ anonfun $ 1.适用(UnionRDD.scala:66)
在org.apache.spark.rdd.UnionRDD $$ anonfun $ 1.适用(UnionRDD.scala:66)
================================================== ===================
================================================== ===================
在scala.collection.TraversableLike $$ anonfun $ $图1.适用(TraversableLike.scala:244)
在一般可以使用检查点,打破长期谱系。有些或多或少的相似,这应该工作:
进口org.apache.spark.rdd.RDD
进口scala.reflect.ClassTagVAL checkpointInterval:INT =?高清loadAndFilter(路径:字符串)= sc.textFile(路径)
.filter(_。startsWith(#####))
.MAP((路径,_))高清mergeWithLocalCheckpoint [T:ClassTag](间隔:智力)
(ACC:RDD [T],西安:(RDD [T],智力))= {
如果(xi._2%区间== 0安培; xi._2大于0)xi._1.union(ACC).localCheckpoint
其他xi._1.union(ACC)
}VAL零:RDD [(字符串,字符串)] = sc.emptyRDD [(字符串,字符串)]
fileList.map(loadAndFilter).zipWithIndex
.foldLeft(零)(mergeWithLocalCheckpoint(checkpointInterval))
在这种特殊情况下一个更简单的解决方案应该是使用 SparkContext.union
方法:
VAL masterRDD = sc.union(
fileList.map(PATH => sc.textFile(路径)
.filter(_。startsWith(#####))
.MAP((路径,_)))
)
当你看看由环路产生的DAG这些方法之间的差异应该是显而易见/ 减少
:
和一个联盟
:
当然,如果文件很小,你可以将 wholeTextFiles
与 flatMap
和一次读取的所有文件:
sc.wholeTextFiles(fileList.mkString())
.flatMap {情况下(路径,文本)=>
text.split(\\ n)。过滤器(_。startsWith(#####))。图((路径,_))}
I have thousands of small files in HDFS. Need to process a slightly smaller subset of files (which is again in thousands), fileList contains list of filepaths which need to be processed.
// fileList == list of filepaths in HDFS
var masterRDD: org.apache.spark.rdd.RDD[(String, String)] = sparkContext.emptyRDD
for (i <- 0 to fileList.size() - 1) {
val filePath = fileStatus.get(i)
val fileRDD = sparkContext.textFile(filePath)
val sampleRDD = fileRDD.filter(line => line.startsWith("#####")).map(line => (filePath, line))
masterRDD = masterRDD.union(sampleRDD)
}
masterRDD.first()
//Once out of looop, performing any action results in stackoverflow error due to long lineage of RDD
Exception in thread "main" java.lang.StackOverflowError
at scala.runtime.AbstractFunction1.<init>(AbstractFunction1.scala:12)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.<init>(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
=====================================================================
=====================================================================
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
In general you can use checkpoints to break long lineages. Some more or less similar to this should work:
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
val checkpointInterval: Int = ???
def loadAndFilter(path: String) = sc.textFile(path)
.filter(_.startsWith("#####"))
.map((path, _))
def mergeWithLocalCheckpoint[T: ClassTag](interval: Int)
(acc: RDD[T], xi: (RDD[T], Int)) = {
if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint
else xi._1.union(acc)
}
val zero: RDD[(String, String)] = sc.emptyRDD[(String, String)]
fileList.map(loadAndFilter).zipWithIndex
.foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval))
In this particular situation a much simpler solution should be to use SparkContext.union
method:
val masterRDD = sc.union(
fileList.map(path => sc.textFile(path)
.filter(_.startsWith("#####"))
.map((path, _)))
)
A difference between these methods should be obvious when you take a look at the DAG generated by loop / reduce
:
and a single union
:
Of course if files are small you can combine wholeTextFiles
with flatMap
and read all files at once:
sc.wholeTextFiles(fileList.mkString(","))
.flatMap{case (path, text) =>
text.split("\n").filter(_.startsWith("#####")).map((path, _))}
这篇关于计算器由于长期RDD天堂的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!