它的Java文件执行Hadoop的HDFS文件拆分成块 [英] Which Java file does Hadoop HDFS file splitting into Blocks

查看:590
本文介绍了它的Java文件执行Hadoop的HDFS文件拆分成块的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

大家都知道,当从本地文本文件被复制到HDFS,该文件被分成固定大小128 MB。例如,当我复制一个256MB的文本文件到HDFS,将会有2个街区,包含了分裂的文件(128分之256)。

As we all know when a text file from local is being copied into the HDFS, the file is being split into a fixed size 128 MB. For example, when I copy a 256 MB text file into HDFS, there will be 2 blocks (256/128) that contain the "splitted" file.

有人可以告诉我的Java / jar文件在的Hadoop 2.7.1 源$ C ​​$ C不拆分文件到块,其中的Java的功能/ jar文件写入到块该Datanode的目录。

Can Someone please tell me which java/jar file in the Hadoop 2.7.1 source code does the functionality of splitting the file into blocks and which java/jar file writes the blocks into the datanode's directory.

帮我跟踪这个code。

Help me trace this code.

我只找到一个,他们做了逻辑输入分割为这是在FileInputFormat.java发现,这是不是我所需要的块。我需要的物理文件被分裂Java文件。

I only found the one where they did the logical input splits for blocks which is found in the FileInputFormat.java and that is not what I need. I need the java file for the physical file being split.

推荐答案

在code将数据写入到的DataNodes是2个文件present:

The code for writing data into DataNodes is present in 2 files:


  • DFSOutputStream.java (包: org.apache.hadoop.hdfs

由客户端写入的数据被分成数据包(通常64k的大小的)。当一个数据包准备好,得到的数据将排队数据队列,这是由 DataStreamer 回升。

The data written by client is split into packets (typically of 64k size). When a packet of data is ready, the data gets enqueued into a Data Queue, which is picked up by the DataStreamer.

DataStreamer (包: org.apache.hadoop.hdfs

有拾取在数据队列中的数据包并把它们在管道发送到数据节点(典型地有3数据节点由于3复制因子在数据管道,)。

It picks up the packets in the Data Queue and sends them to the Data Nodes in the pipeline (typically there are 3 Data Nodes in a data pipeline, because of replication factor of 3).

据检索新的块ID和开始流数据,以数据节点。当数据块被写入时,会关闭当前的块并得到一个新的块写入下一组数据包。

It retrieves a new block ID and starts streaming the data to Data Nodes. When a block of data is written, it closes the current block and gets a new block for writing next set of packets.

在code,其中一个新块得到的,是如下:

The code, where a new block is got, is below:

// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
  if(LOG.isDebugEnabled()) {
    LOG.debug("Allocating new block");
  }
  setPipeline(nextBlockOutputStream());
  initDataStreaming();
}

在code,其中当前块被关闭,低于:

The code, where the current block gets closed, is below:

// Is this block full?
if (one.isLastPacketInBlock()) {
  // wait for the close packet has been acked
  synchronized (dataQueue) {
    while (!shouldStop() && ackQueue.size() != 0) {
      dataQueue.wait(1000);// wait for acks to arrive from datanodes
    }
  }
  if (shouldStop()) {
    continue;
  }

  endBlock();
}

端块()方法,再次在舞台设置为:

In the endBlock() method, again the stage is set to:

stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;

这意味着,针对下一组数据包写入新的块中创建一个新的渠道。

which means, a new pipeline is created for writing next set of packets to a new Block.

编辑:?如何检测块的结束

由于 DataStreamer 保存数据附加到一个块,它更新写入的字节数。

As DataStreamer keeps appending data to a block, it updates the number of bytes written.

/**
  * increase bytes of current block by len.
  *
  * @param len how many bytes to increase to current block
  */
void incBytesCurBlock(long len) {
    this.bytesCurBlock += len;
}

它还保存检查,如果写入的字节数等于块大小:

It also keeps checking, if the number of bytes written is equal to the blocksize:

// If packet is full, enqueue it for transmission
//
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
    getStreamer().getBytesCurBlock() == blockSize) {
  enqueueCurrentPacketFull();
}

如果达到块大小

在上面的语句中,以下条件检查:

In the statement above, following condition checks if the blocksize is reached:

getStreamer().getBytesCurBlock() == blockSize)

如果遇到块边界,那么端块()方法被调用:

If the block boundary is encountered, then endBlock() method gets called:

/**
 * if encountering a block boundary, send an empty packet to
 * indicate the end of block and reset bytesCurBlock.
 *
 * @throws IOException
 */
protected void endBlock() throws IOException {
    if (getStreamer().getBytesCurBlock() == blockSize) {
      setCurrentPacketToEmpty();
      enqueueCurrentPacket();
      getStreamer().setBytesCurBlock(0);
      lastFlushOffset = 0;
    }
}

这将确保,当前块被关闭,并从名称节点获得一个新的块写入数据。

This will ensure that, the current block gets closed and a new block is obtained from Name Node for writing the data.

块大小是由 dfs.blocksize 确定参数 HDFS-site.xml中文件(它被定128 MB在我的群集= 134217728):

The block size is determined by dfs.blocksize parameter in hdfs-site.xml file (it is set to 128 MB = 134217728 in my cluster):

<property>
    <name>dfs.blocksize</name>
    <value>134217728</value>
    <description>The default block size for new files, in bytes.
        You can use the following suffix (case insensitive): k(kilo),
        m(mega), g(giga), t(tera), p(peta), e(exa) to specify the
        size (such as 128k, 512m, 1g, etc.), Or provide complete size
        in bytes (such as 134217728 for 128 MB).
    </description>
</property>

这篇关于它的Java文件执行Hadoop的HDFS文件拆分成块的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆