Spark中Map任务中占用的内存很大 [英] Huge memory consumption in Map Task in Spark

查看:910
本文介绍了Spark中Map任务中占用的内存很大的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有很多包含大约60万行的文件。我所有的文件格式都是格式 {timestamp}#{producer}#{messageId}#{data_bytes} \\\



我逐个浏览我的文件,也希望为每个输入文件构建一个输出文件。
因为一些线路取决于之前的线路,所以我将它们按其制作者分组。每当一条线取决于一条或多条先前的线时,它们的制作者总是相同的。
将所有行分组后,我将它们提供给我的Java分析器。
然后,解析器将包含内存中所有已解析的数据对象,然后将其输出为JSON。

为了让我想象我的工作如何被处理,我将以下流程图放在一起。请注意,我没有看到 groupByKey - 拖拽过程。



我的问题:


  • 我期望Spark将文件拆分,使用单独的任务处理拆分并保存每个任务输出到部分文件。然而,我的任务耗尽内存并在YARN完成之前被杀害: YARN为超过内存限制而杀死的容器。使用7.6 GB的7.5 GB物理内存

  • My Parser将所有已分析的数据对象放入内存中。我无法更改解析器的代码。

  • 请注意,我的代码适用于较小的文件(例如两个文件,每行600.000行作为我的作业的输入)



我的问题:


  1. 我如何确保Spark会为每个文件分割创建一个结果在我的地图任务? (也许他们会如果我的任务成功,但我永远不会看到输出截至目前。)

  2. 我认为我的地图变换 val lineMap = lines .map ... (参见下面的Scala代码)产生一个分区的rdd。因此,我期望在调用我的第二个map任务之前,以某种方式拆分rdd的值。

    另外,我认为在这个rdd <$ c $上调用saveAsTextFile c> lineMap 将产生一个输出任务,在我的每个地图任务完成后运行。如果我的假设是正确的,为什么我的执行者仍然用完内存? Spark是否做了几个(太)大文件拆分并同时处理它们,这导致Parser填充内存?

  3. 重新分区 lineMap rdd为我的解析器获得更多(更小)的输入是一个好主意?

  4. 有没有其他的减速步骤,我不知道?就像在写入文件或类似文件之前汇总结果一样?






Scala代码(我遗漏了不相关的代码部分):

  def main(args:Array [String]){
val inputFilePath = args(0)
val outputFilePath = args 1)

val inputFiles = fs.listStatus(new Path(inputFilePath))
inputFiles.foreach(filename => {
processData(filename.getPath,...)
})
}


def processData(filePath:Path,...){
val lines = sc.textFile(filePath.toString( ))
val lineMap = lines.map(line =>(line.split()(1),line))。groupByKey()

val parsedLines = lineMap.map {case(key,values)=> parseLinesByKey(key,values,config)}
//每个输出应单独保存
parsedLines.saveAsTextFile(outputFilePath.toString()+/+ filePath.getName)
}

$ b $ def def parseLinesByKey(key:String,values:Iterable [String],config:Config)= {
val importer = new LogFileImporter(...)
import。 parseData(values.toIterator.asJava,...)

//从现在开始,导入器包含所有内存中已解析的数据对象,可以从给定值解析
//。

val jsonMapper = getJsonMapper(...)
val jsonStringData = jsonMapper.getValueFromString(importer.getDataObject)

(key,jsonStringData)
}


解决方案

我通过移除groupByKey调用并执行新的FileInputFormat以及一个RecordReader来消除我的限制,即线依赖于其他行。现在,我实现了它,以便每个拆分包含前一个拆分的50.000字节开销。这将确保所有依赖于前一行的行可以正确解析。

现在我将继续研究上一个拆分的最后50.000字节,但只复制实际影响当前拆分解析的行。因此,我最大限度地减少了开销,并仍然得到了高度可并行化的任务。



以下链接将我拖入正确的方向。由于FileInputFormat / RecordReader的主题一见钟情(至少对我来说是相当复杂的),所以阅读这些文章并理解这是否适合您的问题是很好的:





ae.be 文章中的相关代码部分仅在如果网站出现故障。作者( @Gurdt )使用它来检测聊天消息是否包含转义行返回(通过以行结束\)并将转义行附加在一起,直到找到未找到的\\\
。这将允许他检索跨越两行或更多行的消息。用Scala写的代码:

用法

  val conf = new配置(sparkContext.hadoopConfiguration)
val rdd = sparkContext.newAPIHadoopFile(data.txt,classOf [MyFileInputFormat],
classOf [LongWritable],classOf [Text],conf)

FileInputFormat

  class MyFileInputFormat扩展FileInputFormat [LongWritable,Text] {
覆盖def createRecordReader(split:InputSplit,context:TaskAttemptContext):
RecordReader [LongWritable,Text] = new MyRecordReader()
}

RecordReader

  class MyRecordReader()扩展了RecordReader [LongWritable,Text] {
var start,end,position = 0L
var reader:LineReader = null
var key = new LongWritable
var value = new文本

覆盖def initialize(inputSplit:InputSplit,context:TaskAttemptContext):Unit = {
//分割位置(先开始一个字节以检测
//分割是否在前一个记录的中间开始)
val split = inputSplit.asInstanceOf [FileSplit]
start = 0.max( split.getStart - 1)
end = start + split.getLength

//打开数据流,指向分割的开始
val stream = split。 getPath.getFileSystem(context.getConfiguration)
.open(split.getPath)
stream.seek(start)
reader = new LineReader(stream,context.getConfiguration)

//如果分割从一个换行符开始,我们希望再次开始另一个字节
//来检查换行符是否被转义
val firstByte = stream.readByte()。toInt
if(firstByte =='\\\
')
start = 0.max(start - 1)
stream.seek(start)

if(start! = 0)
skipRemainderFromPreviousSplit(reader)
}

def skipRemainderFromPreviousSplit(reader:LineReader):Unit = {
var readAnotherLine = true
while(readAnotherLine){
//读下一行
val buffer = new Text ()
start + = reader.readLine(buffer,Integer.MAX_VALUE,Integer.MAX_VALUE)
pos = start

//检测分隔符是否被转义
readAnotherLine = buffer.getLength& gt; = 1& amp; amp; amp; amp; //读了一些东西
buffer.charAt(buffer.getLength - 1)=='\\'& amp; amp; amp; amp; amp; amp; //换行符被转义
pos& lt; = end //查找头没有通过拆分
}
}

覆盖def nextKeyValue():布尔= {
key.set(pos)

//读取换行符,直到读取非转义的换行符
var lastNewlineWasEscaped = false
while(pos& lt; end || lastNewlineWasEcaped){
//读取下一行
val buffer = new Text
pos + = reader.readLine(buffer,Integer.MAX_VALUE,Integer.MAX_VALUE)

//在必要时将新读取的数据追加到以前的数据
value = if(lastNewlineWasEscaped)new Text(value +\ n+ buffer)else buffer

//检测如果分隔符已被转义
lastNewlineWasEscaped = buffer.charAt(buffer.getLength - 1)=='\\'

//让Spark知道键值对已经准备就绪!
如果(!lastNewlineWasEscaped)
返回true
}

//到达分割结束?
return false
}
}

注意:您可能需要在您的RecordReader中实现getCurrentKey,getCurrentValue,close和getProgress。


I have a lot of files that contain roughly 60.000.000 lines. All of my files are formatted in the format {timestamp}#{producer}#{messageId}#{data_bytes}\n

I walk through my files one by one and also want to build one output file per input file. Because some of the lines depend on previous lines, I grouped them by their producer. Whenever a line depends on one or more previous lines, their producer is always the same. After grouping up all of the lines, I give them to my Java parser. The parser then will contain all parsed data objects in memory and output it as JSON afterwards.

To visualize how I think my Job is processed, I threw together the following "flow graph". Note that I did not visualize the groupByKey-Shuffeling-Process.

My problems:

  • I expected Spark to split up the files, process the splits with separate tasks and save each task output to a "part"-file.
  • However, my tasks run out of memory and get killed by YARN before they can finish: Container killed by YARN for exceeding memory limits. 7.6 GB of 7.5 GB physical memory used
  • My Parser is throwing all parsed data objects into memory. I can't change the code of the Parser.
  • Please note that my code works for smaller files (for example two files with 600.000 lines each as the input to my Job)

My questions:

  1. How can I make sure that Spark will create a result for every file-split in my map task? (Maybe they will if my tasks succeed but I will never see the output as of now.)
  2. I thought that my map transformation val lineMap = lines.map ... (see Scala code below) produces a partitioned rdd. Thus I expect the values of the rdd to be split in some way before calling my second map task.

    Furthermore, I thought that calling saveAsTextFile on this rdd lineMap will produce a output task that runs after each of my map task has finished. If my assumptions are correct, why do my executors still run out of memory? Is Spark doing several (too) big file splits and processes them concurrently, which leads to the Parser filling up the memory?

  3. Is repartitioning the lineMap rdd to get more (smaller) inputs for my Parser a good idea?
  4. Is there somewhere an additional reducer step which I am not aware of? Like results being aggregated before getting written to file or similar?


Scala code (I left out unrelevant code parts):

def main(args: Array[String]) {
    val inputFilePath = args(0)
    val outputFilePath = args(1)

    val inputFiles = fs.listStatus(new Path(inputFilePath))
    inputFiles.foreach( filename => {
        processData(filename.getPath, ...)
    }) 
}


def processData(filePath: Path, ...) {
    val lines  = sc.textFile(filePath.toString())
    val lineMap = lines.map(line => (line.split(" ")(1), line)).groupByKey()

    val parsedLines = lineMap.map{ case(key, values) => parseLinesByKey(key, values, config) }
    //each output should be saved separately
    parsedLines.saveAsTextFile(outputFilePath.toString() + "/" + filePath.getName)     
}


def parseLinesByKey(key: String, values: Iterable[String], config : Config) = {
    val importer = new LogFileImporter(...)
    importer.parseData(values.toIterator.asJava, ...)

    //importer from now contains all parsed data objects in memory that could be parsed 
    //from the given values.  

    val jsonMapper = getJsonMapper(...)
    val jsonStringData = jsonMapper.getValueFromString(importer.getDataObject)

    (key, jsonStringData)
}

解决方案

I fixed this by removing the groupByKey call and implementing a new FileInputFormat as well as a RecordReader to remove my limitations that lines depend on other lines. For now, I implemented it so that each split will contain a 50.000 Byte overhead of the previous split. This will ensure that all lines that depend on previous lines can be parsed correctly.

I will now go ahead and still look through the last 50.000 Bytes of the previous split, but only copy over lines that actually affect the parsing of the current split. Thus, I minimize the overhead and still get a highly parallelizable task.

The following links dragged me into the right direction. Because the topic of FileInputFormat/RecordReader is quite complicated at first sight (it was for me at least), it is good to read through these articles and understand whether this is suitable for your problem or not:

Relevant code parts from the ae.be article just in case the website goes down. The author (@Gurdt) uses this to detect whether a chat message contains an escaped line return (by having the line end with "\") and appends the escaped lines together until an unescaped \n is found. This will allow him to retrieve messages that spans over two or more lines. The code written in Scala:

Usage

val conf = new Configuration(sparkContext.hadoopConfiguration)
val rdd = sparkContext.newAPIHadoopFile("data.txt", classOf[MyFileInputFormat],
classOf[LongWritable], classOf[Text], conf)

FileInputFormat

class MyFileInputFormat extends FileInputFormat[LongWritable, Text] {
    override def createRecordReader(split: InputSplit, context: TaskAttemptContext):
    RecordReader[LongWritable, Text] = new MyRecordReader()
}

RecordReader

class MyRecordReader() extends RecordReader[LongWritable, Text] {
    var start, end, position = 0L
    var reader: LineReader = null
    var key = new LongWritable
    var value = new Text

    override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
        // split position in data (start one byte earlier to detect if
        // the split starts in the middle of a previous record)
        val split = inputSplit.asInstanceOf[FileSplit]
        start = 0.max(split.getStart - 1)
        end = start + split.getLength

        // open a stream to the data, pointing to the start of the split
        val stream = split.getPath.getFileSystem(context.getConfiguration)
        .open(split.getPath)
        stream.seek(start)
        reader = new LineReader(stream, context.getConfiguration)

        // if the split starts at a newline, we want to start yet another byte
        // earlier to check if the newline was escaped or not
        val firstByte = stream.readByte().toInt
        if(firstByte == '\n')
            start = 0.max(start - 1)
        stream.seek(start)

        if(start != 0)
            skipRemainderFromPreviousSplit(reader)
    }

    def skipRemainderFromPreviousSplit(reader: LineReader): Unit = {
        var readAnotherLine = true
        while(readAnotherLine) {
            // read next line
            val buffer = new Text()
            start += reader.readLine(buffer, Integer.MAX_VALUE, Integer.MAX_VALUE)
            pos = start

            // detect if delimiter was escaped
            readAnotherLine = buffer.getLength &gt;= 1 &amp;&amp; // something was read
            buffer.charAt(buffer.getLength - 1) == '\\' &amp;&amp; // newline was escaped
            pos &lt;= end // seek head hasn't passed the split
        }
    }

    override def nextKeyValue(): Boolean = {
        key.set(pos)

        // read newlines until an unescaped newline is read
        var lastNewlineWasEscaped = false
        while (pos &lt; end || lastNewlineWasEscaped) {
            // read next line
            val buffer = new Text
            pos += reader.readLine(buffer, Integer.MAX_VALUE, Integer.MAX_VALUE)

            // append newly read data to previous data if necessary
            value = if(lastNewlineWasEscaped) new Text(value + "\n" + buffer) else buffer

            // detect if delimiter was escaped
            lastNewlineWasEscaped = buffer.charAt(buffer.getLength - 1) == '\\'

            // let Spark know that a key-value pair is ready!
            if(!lastNewlineWasEscaped)
                return true
        }

        // end of split reached?
        return false
    }
}

Note: You might need to implement getCurrentKey, getCurrentValue, close and getProgress in your RecordReader as well.

这篇关于Spark中Map任务中占用的内存很大的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆