Hadoop 进程记录如何跨块边界拆分? [英] How does Hadoop process records split across block boundaries?

查看:36
本文介绍了Hadoop 进程记录如何跨块边界拆分?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据Hadoop - 权威指南

FileInputFormats 定义的逻辑记录通常不能整齐地放入 HDFS 块中.例如,TextInputFormat 的逻辑记录是行,通常会跨越 HDFS 边界.这与您的程序的功能无关——例如,线路不会丢失或损坏——但值得了解,因为它确实意味着数据本地映射(即,与其运行在同一主机上的映射)输入数据)将执行一些远程读取.这导致的轻微开销通常并不显着.

The logical records that FileInputFormats define do not usually fit neatly into HDFS blocks. For example, a TextInputFormat’s logical records are lines, which will cross HDFS boundaries more often than not. This has no bearing on the functioning of your program—lines are not missed or broken, for example—but it’s worth knowing about, as it does mean that data-local maps (that is, maps that are running on the same host as their input data) will perform some remote reads. The slight overhead this causes is not normally significant.

假设一条记录行被分成两个块(b1 和 b2).处理第一个块 (b1) 的映射器会注意到最后一行没有 EOL 分隔符,并从下一个数据块 (b2) 中获取该行的剩余部分.

Suppose a record line is split across two blocks (b1 and b2). The mapper processing the first block (b1) will notice that the last line doesn't have a EOL separator and fetches the remaining of the line from the next block of data (b2).

处理第二个块(b2)的映射器如何确定第一条记录不完整,应该从块(b2)中的第二条记录开始处理?

How does the mapper processing the second block (b2) determine that the first record is incomplete and should process starting from the second record in the block (b2)?

推荐答案

有趣的问题,我花了一些时间查看代码以了解详细信息,以下是我的想法.拆分由客户端通过 InputFormat.getSplits 处理,因此查看 FileInputFormat 可提供以下信息:

Interesting question, I spent some time looking at the code for the details and here are my thoughts. The splits are handled by the client by InputFormat.getSplits, so a look at FileInputFormat gives the following info:

  • 对于每个输入文件,获取文件长度、块大小并计算分割大小为 max(minSize, min(maxSize, blockSize)) 其中 maxSize 对应mapred.max.split.sizeminSizemapred.min.split.size.
  • 根据上面计算的分割大小将文件分割成不同的FileSplit.这里重要的是每个FileSplit 都用一个start 参数初始化,该参数对应于输入文件中的偏移量.那时仍然没有处理这些线.代码的相关部分如下所示:

  • For each input file, get the file length, the block size and calculate the split size as max(minSize, min(maxSize, blockSize)) where maxSize corresponds to mapred.max.split.size and minSize is mapred.min.split.size.
  • Divide the file into different FileSplits based on the split size calculated above. What's important here is that each FileSplit is initialized with a start parameter corresponding to the offset in the input file. There is still no handling of the lines at that point. The relevant part of the code looks like this:

while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                           blkLocations[blkIndex].getHosts()));
  bytesRemaining -= splitSize;
}

之后,如果您查看由 TextInputFormat 定义的 LineRecordReader,那就是处理行的地方:

After that, if you look at the LineRecordReader which is defined by the TextInputFormat, that's where the lines are handled:

  • 当您初始化您的 LineRecordReader 时,它会尝试实例化一个 LineReader,它是一种能够通过 FSDataInputStream 读取行的抽象.有两种情况:
  • 如果定义了 CompressionCodec,则此编解码器负责处理边界.可能与您的问题无关.
  • 然而,如果没有编解码器,这就是有趣的地方:如果您的 InputSplitstart 不同于 0,那么您回溯1 个字符,然后跳过您遇到的由 或 (Windows) 标识的第一行!回溯很重要,因为如果您的行边界与拆分边界相同,则可以确保您不会跳过有效行.相关代码如下:

  • When you initialize your LineRecordReader it tries to instantiate a LineReader which is an abstraction to be able to read lines over FSDataInputStream. There are 2 cases:
  • If there is a CompressionCodec defined, then this codec is responsible for handling boundaries. Probably not relevant to your question.
  • If there is no codec however, that's where things are interesting: if the start of your InputSplit is different than 0, then you backtrack 1 character and then skip the first line you encounter identified by or (Windows) ! The backtrack is important because in case your line boundaries are the same as split boundaries, this ensures you do not skip the valid line. Here is the relevant code:

if (codec != null) {
   in = new LineReader(codec.createInputStream(fileIn), job);
   end = Long.MAX_VALUE;
} else {
   if (start != 0) {
     skipFirstLine = true;
     --start;
     fileIn.seek(start);
   }
   in = new LineReader(fileIn, job);
}
if (skipFirstLine) {  // skip first line and re-establish "start".
  start += in.readLine(new Text(), 0,
                    (int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;

所以由于分割是在客户端计算的,映射器不需要按顺序运行,每个映射器已经知道是否需要丢弃第一行.

So since the splits are calculated in the client, the mappers don't need to run in sequence, every mapper already knows if it neds to discard the first line or not.

所以基本上如果你在同一个文件中有 2 行,每行 100Mb,为了简化,假设分割大小是 64Mb.然后当计算输入拆分时,我们将有以下场景:

So basically if you have 2 lines of each 100Mb in the same file, and to simplify let's say the split size is 64Mb. Then when the input splits are calculated, we will have the following scenario:

  • 拆分 1,包含该块的路径和主机.开始时初始化 200-200=0Mb,长度 64Mb.
  • 拆分 2 在开始时初始化为 200-200+64=64Mb,长度为 64Mb.
  • 拆分 3 在开始时初始化为 200-200+128=128Mb,长度为 64Mb.
  • Split 4 在开始时初始化为 200-200+192=192Mb,长度为 8Mb.
  • 映射器 A 将处理拆分 1,起始值为 0,因此不要跳过第一行,并读取超过 64Mb 限制的整行,因此需要远程读取.
  • Mapper B 将处理 split 2, start is != 0 所以跳过 64Mb-1byte 之后的第一行,这对应于第 1 行的末尾 100Mb 仍然在 split 2 中,我们有 28Mb 的行在 split2、等远程读取剩余的72Mb.
  • Mapper C 将处理 split 3,start is != 0 所以跳过 128Mb-1byte 之后的第一行,这对应于 200Mb 的第 2 行的结尾,这是文件的结尾,所以不要做任何事情.
  • 莉>
  • 映射器 D 与映射器 C 相同,只是它在 192Mb-1 字节之后查找换行符.

这篇关于Hadoop 进程记录如何跨块边界拆分?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆