计算器由于长期RDD天堂 [英] Stackoverflow due to long RDD Lineage

查看:193
本文介绍了计算器由于长期RDD天堂的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有成千上万的小文件在HDFS。需要处理文件的稍小的子集(这又是在千)的fileList包含需要被处理的文件路径的列表。

  //的fileList ==在HDFS文件路径列表VAR masterRDD:org.apache.spark.rdd.RDD [(字符串,字符串)] = sparkContext.emptyRDD为(ⅰ&下;  -  0到fileList.size() -  1){VAL文件路径= fileStatus.get(I)
VAL fileRDD = sparkContext.textFile(文件路径)
VAL sampleRDD = fileRDD.filter(行=> line.startsWith(#####))的地图。(行=>(文件路径,线))masterRDD = masterRDD.union(sampleRDD)}masterRDD.first()

//一旦离开德路,表演在计算器错误的任何行动结果由于RDD的长期沿袭

 异常线程mainjava.lang.StackOverflowError
    在scala.runtime.AbstractFunction1&下;初始化>(AbstractFunction1.scala:12)。
    在org.apache.spark.rdd.UnionRDD $$ anonfun $ 1<&初始化GT;(UnionRDD.scala:66)
    在org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    在org.apache.spark.rdd.RDD $$ anonfun $ $分区2.适用(RDD.scala:239)
    在org.apache.spark.rdd.RDD $$ anonfun $ $分区2.适用(RDD.scala:237)
    在scala.Option.getOrElse(Option.scala:120)
    在org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    在org.apache.spark.rdd.UnionRDD $$ anonfun $ 1.适用(UnionRDD.scala:66)
    在org.apache.spark.rdd.UnionRDD $$ anonfun $ 1.适用(UnionRDD.scala:66)
    在scala.collection.TraversableLike $$ anonfun $ $图1.适用(TraversableLike.scala:244)
    在scala.collection.TraversableLike $$ anonfun $ $图1.适用(TraversableLike.scala:244)
    在scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)
    在scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    在scala.collection.TraversableLike $ class.map(TraversableLike.scala:244)
    在scala.collection.AbstractTraversable.map(Traversable.scala:105)
    在org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    在org.apache.spark.rdd.RDD $$ anonfun $ $分区2.适用(RDD.scala:239)
    在org.apache.spark.rdd.RDD $$ anonfun $ $分区2.适用(RDD.scala:237)
    在scala.Option.getOrElse(Option.scala:120)
    在org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    在org.apache.spark.rdd.UnionRDD $$ anonfun $ 1.适用(UnionRDD.scala:66)
    在org.apache.spark.rdd.UnionRDD $$ anonfun $ 1.适用(UnionRDD.scala:66)
    在scala.collection.TraversableLike $$ anonfun $ $图1.适用(TraversableLike.scala:244)
    在scala.collection.TraversableLike $$ anonfun $ $图1.适用(TraversableLike.scala:244)
    在scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)
    在scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    在scala.collection.TraversableLike $ class.map(TraversableLike.scala:244)
    在scala.collection.AbstractTraversable.map(Traversable.scala:105)
    在org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    在org.apache.spark.rdd.RDD $$ anonfun $ $分区2.适用(RDD.scala:239)
    在org.apache.spark.rdd.RDD $$ anonfun $ $分区2.适用(RDD.scala:237)
    在scala.Option.getOrElse(Option.scala:120)
    在org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    在org.apache.spark.rdd.UnionRDD $$ anonfun $ 1.适用(UnionRDD.scala:66)
    在org.apache.spark.rdd.UnionRDD $$ anonfun $ 1.适用(UnionRDD.scala:66)
    ================================================== ===================
    ================================================== ===================
    在scala.collection.TraversableLike $$ anonfun $ $图1.适用(TraversableLike.scala:244)


解决方案

在一般可以使用检查点,打破长期谱系。有些或多或少的相似,这应该工作:

 进口org.apache.spark.rdd.RDD
进口scala.reflect.ClassTagVAL checkpointInterval:INT =?高清loadAndFilter(路径:字符串)= sc.textFile(路径)
  .filter(_。startsWith(#####))
  .MAP((路径,_))高清mergeWithLocalCheckpoint [T:ClassTag](间隔:智力)
  (ACC:RDD [T],西安:(RDD [T],智力))= {
    如果(xi._2%区间== 0安培; xi._2大于0)xi._1.union(ACC).localCheckpoint
    其他xi._1.union(ACC)
  }VAL零:RDD [(字符串,字符串)] = sc.emptyRDD [(字符串,字符串)]
fileList.map(loadAndFilter).zipWithIndex
  .foldLeft(零)(mergeWithLocalCheckpoint(checkpointInterval))

在这种特殊情况下一个更简单的解决方案应该是使用 SparkContext.union 方法:

VAL masterRDD = sc.union(
  fileList.map(PATH => sc.textFile(路径)
    .filter(_。startsWith(#####))
    .MAP((路径,_)))

当你看看由环路产生的DAG这些方法之间的差异应该是显而易见/ 减少

和一个联盟

当然,如果文件很小,你可以将 wholeTextFiles flatMap 和一次读取的所有文件:

  sc.wholeTextFiles(fileList.mkString())
  .flatMap {情况下(路径,文本)=>
    text.split(\\ n)。过滤器(_。startsWith(#####))。图((路径,_))}

I have thousands of small files in HDFS. Need to process a slightly smaller subset of files (which is again in thousands), fileList contains list of filepaths which need to be processed.

// fileList == list of filepaths in HDFS

var masterRDD: org.apache.spark.rdd.RDD[(String, String)] = sparkContext.emptyRDD

for (i <- 0 to fileList.size() - 1) {

val filePath = fileStatus.get(i)
val fileRDD = sparkContext.textFile(filePath)
val sampleRDD = fileRDD.filter(line => line.startsWith("#####")).map(line => (filePath, line)) 

masterRDD = masterRDD.union(sampleRDD)

}

masterRDD.first()

//Once out of looop, performing any action results in stackoverflow error due to long lineage of RDD

Exception in thread "main" java.lang.StackOverflowError
    at scala.runtime.AbstractFunction1.<init>(AbstractFunction1.scala:12)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.<init>(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    =====================================================================
    =====================================================================
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

解决方案

In general you can use checkpoints to break long lineages. Some more or less similar to this should work:

import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag

val checkpointInterval: Int = ???

def loadAndFilter(path: String) = sc.textFile(path)
  .filter(_.startsWith("#####"))
  .map((path, _))

def mergeWithLocalCheckpoint[T: ClassTag](interval: Int)
  (acc: RDD[T], xi: (RDD[T], Int)) = {
    if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint
    else xi._1.union(acc)
  }

val zero: RDD[(String, String)] = sc.emptyRDD[(String, String)]
fileList.map(loadAndFilter).zipWithIndex
  .foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval))

In this particular situation a much simpler solution should be to use SparkContext.union method:

val masterRDD = sc.union(
  fileList.map(path => sc.textFile(path)
    .filter(_.startsWith("#####"))
    .map((path, _))) 
)

A difference between these methods should be obvious when you take a look at the DAG generated by loop / reduce:

and a single union:

Of course if files are small you can combine wholeTextFiles with flatMap and read all files at once:

sc.wholeTextFiles(fileList.mkString(","))
  .flatMap{case (path, text) =>  
    text.split("\n").filter(_.startsWith("#####")).map((path, _))}

这篇关于计算器由于长期RDD天堂的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆