Hadoop进程记录如何跨越块边界进行拆分? [英] How does Hadoop process records split across block boundaries?

查看:157
本文介绍了Hadoop进程记录如何跨越块边界进行拆分?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据 Hadoop - 权威指南


逻辑记录FileInputFormats定义通常不适合HDFS块。例如,TextInputFormat的逻辑记录是行,它们会经常与HDFS边界交叉。这对程序的功能没有任何影响 - 例如,线没有被遗漏或损坏 - 但值得了解,因为它确实意味着数据本地地图(即与它们在同一主机上运行的地图输入数据)将执行一些远程读取。这种情况造成的轻微开销通常不是很重要。假设一条记录行分成两个块(b1和b2)。处理第一个块(b1)的映射器会注意到最后一行没有EOL分隔符,并从下一个数据块(b2)中获取剩下的行。

处理第二个块(b2)的映射器如何确定第一个记录不完整,并且应该从块(b2)中的第二个记录开始处理?

InputFormat.getSplits 来处理分割,所以看一下FileInputFormat会给出以下信息:




  • 对于每个输入文件,获取文件长度,块大小并计算分割大小为 max(minSize,min(maxSize,blockSize))其中 maxSize 对应于 mapred.max.split.size minSize mapred.min.split.size

  • 将文件分成不同的 FileSplit s基于上面计算的拆分大小。这里最重要的是,每个 FileSplit 都使用对应于输入文件中的偏移量的 start 参数进行初始化即可。目前还没有处理线路。代码的相关部分如下所示:$($($($($($))
    $ b

      while(((double)bytesRemaining)/ splitSize> SPLIT_SLOP){
    int blkIndex = getBlockIndex(blkLocations,length-bytesRemaining);
    splits.add(new FileSplit(path,length-bytesRemaining,splitSize,
    blkLocations [blkIndex] .getHosts()));
    bytesRemaining - = splitSize;
    }




之后,如果你看看由 TextInputFormat 定义的 LineRecordReader ,这就是处理行的地方:




  • 当您初始化 LineRecordReader 时,它会尝试实例化 LineReader 这是一个抽象,可以读取 FSDataInputStream 上的行。有2种情况:

  • 如果定义了 CompressionCodec ,则此编解码器负责处理边界。可能与您的问题无关。 如果没有编解码器,那就是有趣的地方:如果开始您的 InputSplit 不等于0,则您回溯1个字符,然后跳过您遇到的由\ n或\ r \ n( Windows)!回溯很重要,因为如果您的行边界与分割边界相同,这可以确保您不会跳过有效行。下面是相关的代码:

      if(codec!= null){
    in = new LineReader(codec.createInputStream (fileIn),job);
    end = Long.MAX_VALUE;
    } else {
    if(start!= 0){
    skipFirstLine = true;
    --start;
    fileIn.seek(start);
    }
    in = new LineReader(fileIn,job);
    }
    if(skipFirstLine){//跳过第一行并重新建立start。
    start + = in.readLine(new Text(),0,
    (int)Math.min((long)Integer.MAX_VALUE,end - start));
    }
    this.pos = start;




因为拆分是在客户端计算的,映射器不需要依次运行,每个映射器都知道它是否需要放弃第一行。



因此,基本上如果你有两行每个100Mb在同一个文件中,并且为了简化,我们假设分割大小为64Mb。然后,当计算输入分割时,我们将有以下场景:


  • 将包含路径和主机的分割1分割到此块。
  • 分割2在开始200-200 + 64 = 64Mb,长度64Mb时初始化。

  • 在开始时初始化200-200 = 0Mb,长度64Mb。 Split 3在开始时初始化为200-200 + 128 = 128Mb,长度为64Mb。
  • Split 4在开始200-200 + 192 = 192Mb处初始化,长度为8Mb。
  • 映射程序A将处理拆分1,开始为0,因此不要跳过第一行,并读取超出64Mb限制的完整行,因此需要远程读取。
  • 映射器B将处理分割2,开始是!= 0,因此跳过64Mb-1byte之后的第一行,这对应于仍处于分割2中的100Mb的第一行的结尾,我们在分割2中具有28Mb的行,因此远程读取剩余的72Mb。
  • 映射器C将处理分割3,开始为!= 0,因此跳过128Mb-1byte后的第一行,这对应于200Mb的第2行的结尾,是文件的结尾,因此不要做任何事情。
  • 映射器D与映射器C相同,只是它在192Mb-1byte之后寻找换行符。

According to the Hadoop - The Definitive Guide

The logical records that FileInputFormats define do not usually fit neatly into HDFS blocks. For example, a TextInputFormat’s logical records are lines, which will cross HDFS boundaries more often than not. This has no bearing on the functioning of your program—lines are not missed or broken, for example—but it’s worth knowing about, as it does mean that data-local maps (that is, maps that are running on the same host as their input data) will perform some remote reads. The slight overhead this causes is not normally significant.

Suppose a record line is split across two blocks (b1 and b2). The mapper processing the first block (b1) will notice that the last line doesn't have a EOL separator and fetches the remaining of the line from the next block of data (b2).

How does the mapper processing the second block (b2) determine that the first record is incomplete and should process starting from the second record in the block (b2)?

解决方案

Interesting question, I spent some time looking at the code for the details and here are my thoughts. The splits are handled by the client by InputFormat.getSplits, so a look at FileInputFormat gives the following info:

  • For each input file, get the file length, the block size and calculate the split size as max(minSize, min(maxSize, blockSize)) where maxSize corresponds to mapred.max.split.size and minSize is mapred.min.split.size.
  • Divide the file into different FileSplits based on the split size calculated above. What's important here is that each FileSplit is initialized with a start parameter corresponding to the offset in the input file. There is still no handling of the lines at that point. The relevant part of the code looks like this:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

After that, if you look at the LineRecordReader which is defined by the TextInputFormat, that's where the lines are handled:

  • When you initialize your LineRecordReader it tries to instantiate a LineReader which is an abstraction to be able to read lines over FSDataInputStream. There are 2 cases:
  • If there is a CompressionCodec defined, then this codec is responsible for handling boundaries. Probably not relevant to your question.
  • If there is no codec however, that's where things are interesting: if the start of your InputSplit is different than 0, then you backtrack 1 character and then skip the first line you encounter identified by \n or \r\n (Windows) ! The backtrack is important because in case your line boundaries are the same as split boundaries, this ensures you do not skip the valid line. Here is the relevant code:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

So since the splits are calculated in the client, the mappers don't need to run in sequence, every mapper already knows if it neds to discard the first line or not.

So basically if you have 2 lines of each 100Mb in the same file, and to simplify let's say the split size is 64Mb. Then when the input splits are calculated, we will have the following scenario:

  • Split 1 containing the path and the hosts to this block. Initialized at start 200-200=0Mb, length 64Mb.
  • Split 2 initialized at start 200-200+64=64Mb, length 64Mb.
  • Split 3 initialized at start 200-200+128=128Mb, length 64Mb.
  • Split 4 initialized at start 200-200+192=192Mb, length 8Mb.
  • Mapper A will process split 1, start is 0 so don't skip first line, and read a full line which goes beyond the 64Mb limit so needs remote read.
  • Mapper B will process split 2, start is != 0 so skip the first line after 64Mb-1byte, which corresponds to the end of line 1 at 100Mb which is still in split 2, we have 28Mb of the line in split 2, so remote read the remaining 72Mb.
  • Mapper C will process split 3, start is != 0 so skip the first line after 128Mb-1byte, which corresponds to the end of line 2 at 200Mb, which is end of file so don't do anything.
  • Mapper D is the same as mapper C except it looks for a newline after 192Mb-1byte.

这篇关于Hadoop进程记录如何跨越块边界进行拆分?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆