在hadoop中将文件读取为单个记录 [英] Reading file as single record in hadoop

查看:113
本文介绍了在hadoop中将文件读取为单个记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有很大的不。的小文件,我想使用CombineFileInputFormat合并这些文件,使得每个文件数据都作为MR作业中的单个记录。
我遵循 http://yaseminavcular.blogspot.in/2011/03 /many-small-input-files.html ,并试图将其转换为新的API

我面临2个问题:



a)我只用2个小文件测试它,仍然有2个映射器被解雇。我预计1



b)每一行都是单记录,我希望整个文件为单记录。



<这可能很痛苦,但请看下面的代码。我在hadoop中仍然很幼稚



驱动程序类

  public class MRDriver extends Configured implements Tool {

$ b @Override
public int run(String [] args)throws Exception {
FileSystem fs = new Path(。 ).getFileSystem(getConf());
fs.printStatistics();
Job job = new Job(getConf());
job.setJobName(Enron MR);
job.setMapperClass(EnronMailReadMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(0);
job.setJarByClass(EnronMailReadMapper.class);
RawCombineFileInputFormat.addInputPath(job,new Path(args [0]));
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path(args [1]));
返回job.waitForCompletion(true)? 0:1;


public static void main(String [] args)throws Exception {
int exitCode = ToolRunner.run(new MRDriver(),args);
System.exit(exitCode);
}

}

下面的类主要是复制LineRecordReader的粘贴,修改initialize()& nextKeyValue()函数

  public class SingleFileRecordReader extends RecordReader< LongWritable,Text> {
private static final Log LOG = LogFactory.getLog(SingleFileRecordReader.class);

私人起步较长;
私人持仓;
私人长期结束;
专用LineReader中;
private int maxLineLength;
private LongWritable key = null;
私人文本值=空;

public void initialize(InputSplit genericSplit,
TaskAttemptContext context)throws IOException {
FileSplit split =(FileSplit)genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(mapred.linerecordreader.maxlength,
Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
最终路径文件= split.getPath();

//打开文件并寻找分割的开始
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());

fileIn.seek(start);
in = new LineReader(fileIn,job);
//如果这不是第一次分割,我们总是扔掉第一条记录
//因为我们总是(除了最后一个分割)在
中读取一行额外的行// next()方法。
if(start!= 0){
start + = in.readLine(new Text(),0,maxBytesToConsume(start));
}
this.pos = start;
}

private int maxBytesToConsume(long pos){
return(int)Math.min(Integer.MAX_VALUE,end - pos);
}

private long getFilePosition()throws IOException {
long retVal = pos;
返回retVal;
}

public boolean nextKeyValue()throws IOException {
if(key == null){
key = new LongWritable();
}
key.set(pos);
if(value == null){
value = new Text();
}
int newSize = 0;
StringBuffer totalValue = new StringBuffer();
//我们总是读取一条额外的行,它位于
上限以外(即,(end - 1))
while(getFilePosition()<= end){
newSize = in.readLine(value,maxLineLength,
Math.max(maxBytesToConsume(pos),maxLineLength));
if(newSize == 0){
break;
}
totalValue.append(value.toString()+\\\
);
pos + = newSize;
if(newSize break;
}

//线路太长。再试一次
LOG.info(posped line + size+ newSize +at pos+
(pos - newSize));
}
if(newSize == 0){
key = null;
value = null;
返回false;
} else {
value = new Text(totalValue.toString());
返回true;
}
}

@Override
public LongWritable getCurrentKey(){
return key;
}

@Override
public Text getCurrentValue(){
return value;
}
$ b $ **
*获取拆分过程中的进度
* /
public float getProgress()throws IOException {
if (start == end){
return 0.0f;
} else {
return Math.min(1.0f,
(getFilePosition() - start)/(float)(end - start));


$ b $ public void close()throws IOException {
try {
if(in!= null){
in 。关();

}终于{b
$ b}
}



}



其他文件

  public class RawCombineFileInputFormat扩展CombineFileInputFormat< LongWritable,Text> {

@Override
public RecordReader< LongWritable,Text> createRecordReader(
InputSplit split,TaskAttemptContext context)throws IOException {
return new CombineFileRecordReader< LongWritable,Text>(((CombineFileSplit)split,context,MultiFileRecordReader.class);
}

}

并且

  public class MultiFileRecordReader extends RecordReader< LongWritable,文字> {

私人CombineFileSplit拆分;
private TaskAttemptContext context;
private int index;
私人RecordReader< LongWritable,文字> RR;

public MultiFileRecordReader(CombineFileSplit split,TaskAttemptContext context,Integer index){
this.split = split;
this.context = context;
this.index = index;
this.rr = new SingleFileRecordReader();

@Override
public void initialize(InputSplit split,TaskAttemptContext context)
throws IOException,InterruptedException {
this.split =(CombineFileSplit)split;
this.context = context;

if(null == rr){
rr = new SingleFileRecordReader();
}

FileSplit fileSplit = new FileSplit(this.split.getPath(index),
this.split.getOffset(index),
this.split.getLength (index),
this.split.getLocations());
this.rr.initialize(fileSplit,this.context);


$ @覆盖
public boolean nextKeyValue()抛出IOException,InterruptedException {
// TODO自动生成的方法存根
返回this.rr.nextKeyValue();

$ b @Override
public LongWritable getCurrentKey()抛出IOException,InterruptedException {
// TODO自动生成的方法存根
返回this.rr. getCurrentKey();
}
$ b @Override
public Text getCurrentValue()throws IOException,InterruptedException {
// TODO自动生成的方法存根
返回this.rr. getCurrentValue();

$ b @Override
public float getProgress()throws IOException,InterruptedException {
// TODO自动生成的方法存根
返回this.rr. getProgress();

$ b @Override
public void close()throws IOException {
if(rr!= null){
rr.close();
rr = null;
}
}

}



记录由WholeFileRecordReaders构建。

  public class WholeFileInputFormat extends CombineFileInputFormat< NullWritable,Text> {

@Override
protected布尔isSplitable(JobContext上下文,路径文件){
返回false;
}

/ **
*创建一个CombineFileRecordReader来读取分配给此InputSplit的每个文件。
*请注意,与普通InputSplits不同,split必须是CombineFileSplit,因此
*预计会指定多个文件。
*
* @param分割InputSplit以进行读取。如果这是
*不是CombineFileSplit,则抛出IllegalArgumentException。
* @param上下文此任务的上下文。
* @返回一个CombineFileRecordReader来处理分割中的每个文件。
*它将使用WholeFileRecordReader读取每个文件。
* @如果发生错误,则抛出IOException。
* /

@Override
public RecordReader< NullWritable,Text> createRecordReader(
InputSplit split,TaskAttemptContext context)抛出IOException {

if(!(split instanceof CombineFileSplit)){
throw new IllegalArgumentException(split must be CombineFileSplit); $($ CombineFileSplit)split,context,WholeFileRecordReader.class);
}

}

这里可以使用WholeFileRecordReader如下所示: -

  public class WholeFileRecordReader extends RecordReader< NullWritable,Text> {
private static final Logger LOG = Logger.getLogger(WholeFileRecordReader.class);

/ **要读取的文件的路径。 * /
private final Path mFileToRead;
/ **这个文件的长度。 * /
private final long mFileLength;

/ **配置。 * /
private final配置mConf;

/ **是否已经处理了此FileSplit。 * /
私人布尔mProcessed;
/ **单个文本存储当前文件的文件名。 * /
// private final Text mFileName;
/ **单个文本在读取时存储该文件的值(值)。 * /
private final Text mFileText;

/ **
*实现细节:该构造函数的构建方式是通过CombineFileRecordReader中的
*反射来调用。
*
* @param fileSplit将读取的CombineFileSplit。
* @param上下文此任务的上下文。
* @param pathToProcess来自CombineFileSplit的路径索引在此记录中处理。
* /
public WholeFileRecordReader(CombineFileSplit fileSplit,TaskAttemptContext context,
Integer pathToProcess){
mProcessed = false;
mFileToRead = fileSplit.getPath(pathToProcess);
mFileLength = fileSplit.getLength(pathToProcess);
mConf = context.getConfiguration();

assert 0 == fileSplit.getOffset(pathToProcess);
if(LOG.isDebugEnabled()){
LOG.debug(FileToRead is:+ mFileToRead.toString());
LOG.debug(Processing path+ pathToProcess +out of+ fileSplit.getNumPaths());

尝试{
FileSystem fs = FileSystem.get(mConf);
断言fs.getFileStatus(mFileToRead).getLen()== mFileLength;
} catch(IOException ioe){
//好吧,我只是在测试。
}
}

// mFileName = new Text();
mFileText = new Text();

$ b / ** {@inheritDoc} * /
@Override $ b $ public void close()throws IOException {
mFileText.clear();
}

/ **
*返回当前文件的绝对路径。
*
* @return当前文件的绝对路径。
* @引发IOException从不。
* @throws InterruptedException永远不会。
* /
@Override
public NullWritable getCurrentKey()throws IOException,InterruptedException {
return NullWritable.get();
}

/ **
*< p>返回当前值。如果通过调用NextKeyValue()来读取文件,
*将以BytesWritable的形式返回文件的内容。否则,它将返回一个
*空的BytesWritable。< / p>
*
*< p>如果initialize()未先被调用,则会抛出IllegalStateException异常。< / p>
*
* @return包含要读取的文件内容的BytesWritable。
* @引发IOException从不。
* @throws InterruptedException永远不会。
* /
@Override
public Text getCurrentValue()throws IOException,InterruptedException {
return mFileText;
}

/ **
*返回文件是否已被处理。由于文件只会生成一个记录
*,因此如果未处理,则进度将为0.0,如果已经处理,则进度为
*和1.0。
*
* @return 0.0如果文件没有被处理。 1.0如果有。
* @引发IOException从不。
* @throws InterruptedException永远不会。
* /
@Override
public float getProgress()throws IOException,InterruptedException {
return(mProcessed)? (float)1.0:(float)0.0;
}

/ **
*所有的内部状态已经在实例化时设置。这是一个没有操作。
*
* @param分割InputSplit以进行读取。没用过。
* @param上下文此任务的上下文。没用过。
* @引发IOException从不。
* @throws InterruptedException永远不会。
* /
@Override
public void initialize(InputSplit split,TaskAttemptContext context)
throws IOException,InterruptedException {
// no-op。
}

/ **
*< p>如果文件尚未被读取,则将其读入内存中,以便将
* getCurrentValue()将以文本形式返回此文件的全部内容,
*和getCurrentKey()将以文本的形式返回此文件的合格路径。然后,返回
* true。如果它已被读取,则返回false而不更新任何内部状态。< / p>
*
* @return文件是否被读取。
*如果读取文件时发生错误,则抛出IOException。
*如果发生错误,则引发InterruptedException。
* /
@Override
public boolean nextKeyValue()throws IOException,InterruptedException {
if(!mProcessed){
if(mFileLength>(long)Integer。 MAX_VALUE){
抛出新的IOException(文件长于Integer.MAX_VALUE。);
}
byte [] contents = new byte [(int)mFileLength];

FileSystem fs = mFileToRead.getFileSystem(mConf);
FSDataInputStream in = null;
尝试{
//设置此文件的内容。
in = fs.open(mFileToRead);
IOUtils.readFully(in,contents,0,contents.length);
mFileText.set(contents,0,contents.length);

} finally {
IOUtils.closeStream(in);
}
mProcessed = true;
返回true;
}
返回false;
}

}

以下是您的驱动程序代码: -

  public int run(String [] arg)throws Exception {
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
//估算减员
工作职位=新职位(conf);
job.setJarByClass(WholeFileDriver.class);
job.setJobName(WholeFile);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(WholeFileInputFormat.class);
job.setMapperClass(WholeFileMapper.class);
job.setNumReduceTasks(0);

FileInputFormat.addInputPath(job,new Path(arg [0]));
Path output = new Path(arg [1]);
尝试{
fs.delete(output,true);
} catch(IOException e){
LOG.warn(无法删除临时路径,e);
}
FileOutputFormat.setOutputPath(job,output);

boolean ret = job.waitForCompletion(true);
if(!ret){
抛出新的异常(Job Failed);
}


I have huge no. of small files, i want to use CombineFileInputFormat to merge the files such that each file data comes as a single record in my MR job. I have followed http://yaseminavcular.blogspot.in/2011/03/many-small-input-files.html and tried to convert it into the new api

I am facing 2 problems:

a) I am just testing it with 2 small files, still 2 mappers are fired. I expected 1

b) Each line is coming as single record, i want the whole file as single record.

It may be painful but please look into the code below. I am still a naive in hadoop

The driver class

public class MRDriver  extends Configured implements Tool {


@Override
public int run(String[] args) throws Exception {
    FileSystem fs = new Path(".").getFileSystem(getConf());
    fs.printStatistics();
    Job job = new Job(getConf());
    job.setJobName("Enron MR");
    job.setMapperClass(EnronMailReadMapper.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setNumReduceTasks(0);
    job.setJarByClass(EnronMailReadMapper.class);
    RawCombineFileInputFormat.addInputPath(job, new Path(args[0]));
    job.setOutputFormatClass(TextOutputFormat.class);
    TextOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 :1;  
}

public static void main(String[] args) throws Exception  {
    int exitCode = ToolRunner.run(new MRDriver(), args);
    System.exit(exitCode);
}

}

The below class is mostly copy paste of LineRecordReader with modification to initialize() & nextKeyValue() function

public class SingleFileRecordReader extends RecordReader<LongWritable, Text> {
  private static final Log LOG = LogFactory.getLog(SingleFileRecordReader.class);

  private long start;
  private long pos;
  private long end;
  private LineReader in;
  private int maxLineLength;
  private LongWritable key = null;
  private Text value = null;

  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                    Integer.MAX_VALUE);
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();

    // open the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(split.getPath());

      fileIn.seek(start);
      in = new LineReader(fileIn, job);
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }

  private int maxBytesToConsume(long pos) {
    return (int) Math.min(Integer.MAX_VALUE, end - pos);
  }

  private long getFilePosition() throws IOException {
    long retVal= pos;
    return retVal;
  }

  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    StringBuffer totalValue = new StringBuffer();
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end) {
      newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
        break;
      }
      totalValue.append(value.toString()+"\n");
      pos += newSize;
      if (newSize < maxLineLength) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
        value = new Text(totalValue.toString());
      return true;
    }
  }

  @Override
  public LongWritable getCurrentKey() {
    return key;
  }

  @Override
  public Text getCurrentValue() {
    return value;
  }

  /**
   * Get the progress within the split
   */
  public float getProgress() throws IOException {
    if (start == end) {
      return 0.0f;
    } else {
      return Math.min(1.0f,
        (getFilePosition() - start) / (float)(end - start));
    }
  }

  public synchronized void close() throws IOException {
    try {
      if (in != null) {
        in.close();
      }
    } finally {

    }
  }

}

Other files

public class RawCombineFileInputFormat extends CombineFileInputFormat <LongWritable,Text>{

@Override
public RecordReader<LongWritable, Text> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException {
    return new CombineFileRecordReader< LongWritable, Text >((CombineFileSplit) split, context, MultiFileRecordReader.class);
}

}

And

public class MultiFileRecordReader extends RecordReader < LongWritable, Text > {

 private CombineFileSplit split;
 private TaskAttemptContext context;
 private int index;
 private RecordReader< LongWritable, Text > rr;

public MultiFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) {
    this.split = split;
    this.context = context;
    this.index = index;
    this.rr = new SingleFileRecordReader();
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
        throws IOException, InterruptedException {
    this.split = (CombineFileSplit) split;
      this.context = context;

      if (null == rr) {
       rr = new SingleFileRecordReader();
      }

      FileSplit fileSplit = new FileSplit(this.split.getPath(index), 
                                          this.split.getOffset(index), 
                                          this.split.getLength(index), 
                                          this.split.getLocations());
      this.rr.initialize(fileSplit, this.context);

}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.nextKeyValue();
}

@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getCurrentKey();
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getCurrentValue();
}

@Override
public float getProgress() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getProgress();
}

@Override
public void close() throws IOException {
    if (rr != null) {
           rr.close();
           rr = null;
    }       
}   

}

解决方案

Take a look into this input format.This is an input format for reading multiple files in a single map task.Exactly one (unsplit) file will be read by each record passed to the mapper. The WholeFileRecordReader takes care of sending one file content as one value. The key returned is NullWritable and value is the content of each file as whole. Now you can use this and run your mapreduce job and see how many mappers actually run and check the output your are getting is correct or not.

Records are constructed from WholeFileRecordReaders.

    public class WholeFileInputFormat extends CombineFileInputFormat<NullWritable, Text>{

        @Override
        protected boolean isSplitable(JobContext context, Path file) {
            return false;
        }

/**
   * Creates a CombineFileRecordReader to read each file assigned to this InputSplit.
   * Note, that unlike ordinary InputSplits, split must be a CombineFileSplit, and therefore
   * is expected to specify multiple files.
   *
   * @param split The InputSplit to read.  Throws an IllegalArgumentException if this is
   *        not a CombineFileSplit.
   * @param context The context for this task.
   * @return a CombineFileRecordReader to process each file in split.
   *         It will read each file with a WholeFileRecordReader.
   * @throws IOException if there is an error.
   */

    @Override
    public RecordReader<NullWritable, Text> createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException {

        if (!(split instanceof CombineFileSplit)) {
              throw new IllegalArgumentException("split must be a CombineFileSplit");
            }
            return new CombineFileRecordReader<NullWritable, Text>((CombineFileSplit) split, context, WholeFileRecordReader.class);
    }

    }

Here above you can WholeFileRecordReader is used which is as follows :-

public class WholeFileRecordReader extends RecordReader<NullWritable, Text> {
    private static final Logger LOG = Logger.getLogger(WholeFileRecordReader.class);

      /** The path to the file to read. */
      private final Path mFileToRead;
      /** The length of this file. */
      private final long mFileLength;

      /** The Configuration. */
      private final Configuration mConf;

      /** Whether this FileSplit has been processed. */
      private boolean mProcessed;
      /** Single Text to store the file name of the current file. */
    //  private final Text mFileName;
      /** Single Text to store the value of this file (the value) when it is read. */
      private final Text mFileText;

      /**
       * Implementation detail: This constructor is built to be called via
       * reflection from within CombineFileRecordReader.
       *
       * @param fileSplit The CombineFileSplit that this will read from.
       * @param context The context for this task.
       * @param pathToProcess The path index from the CombineFileSplit to process in this record.
       */
      public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context,
          Integer pathToProcess) {
        mProcessed = false;
        mFileToRead = fileSplit.getPath(pathToProcess);
        mFileLength = fileSplit.getLength(pathToProcess);
        mConf = context.getConfiguration();

        assert 0 == fileSplit.getOffset(pathToProcess);
        if (LOG.isDebugEnabled()) {
          LOG.debug("FileToRead is: " + mFileToRead.toString());
          LOG.debug("Processing path " + pathToProcess + " out of " + fileSplit.getNumPaths());

          try {
            FileSystem fs = FileSystem.get(mConf);
            assert fs.getFileStatus(mFileToRead).getLen() == mFileLength;
          } catch (IOException ioe) {
            // oh well, I was just testing.
          }
        }

    //    mFileName = new Text();
        mFileText = new Text();
      }

      /** {@inheritDoc} */
      @Override
      public void close() throws IOException {
        mFileText.clear();
      }

      /**
       * Returns the absolute path to the current file.
       *
       * @return The absolute path to the current file.
       * @throws IOException never.
       * @throws InterruptedException never.
       */
      @Override
      public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
      }

      /**
       * <p>Returns the current value.  If the file has been read with a call to NextKeyValue(),
       * this returns the contents of the file as a BytesWritable.  Otherwise, it returns an
       * empty BytesWritable.</p>
       *
       * <p>Throws an IllegalStateException if initialize() is not called first.</p>
       *
       * @return A BytesWritable containing the contents of the file to read.
       * @throws IOException never.
       * @throws InterruptedException never.
       */
      @Override
      public Text getCurrentValue() throws IOException, InterruptedException {
        return mFileText;
      }

      /**
       * Returns whether the file has been processed or not.  Since only one record
       * will be generated for a file, progress will be 0.0 if it has not been processed,
       * and 1.0 if it has.
       *
       * @return 0.0 if the file has not been processed.  1.0 if it has.
       * @throws IOException never.
       * @throws InterruptedException never.
       */
      @Override
      public float getProgress() throws IOException, InterruptedException {
        return (mProcessed) ? (float) 1.0 : (float) 0.0;
      }

      /**
       * All of the internal state is already set on instantiation.  This is a no-op.
       *
       * @param split The InputSplit to read.  Unused.
       * @param context The context for this task.  Unused.
       * @throws IOException never.
       * @throws InterruptedException never.
       */
      @Override
      public void initialize(InputSplit split, TaskAttemptContext context)
          throws IOException, InterruptedException {
        // no-op.
      }

      /**
       * <p>If the file has not already been read, this reads it into memory, so that a call
       * to getCurrentValue() will return the entire contents of this file as Text,
       * and getCurrentKey() will return the qualified path to this file as Text.  Then, returns
       * true.  If it has already been read, then returns false without updating any internal state.</p>
       *
       * @return Whether the file was read or not.
       * @throws IOException if there is an error reading the file.
       * @throws InterruptedException if there is an error.
       */
      @Override
      public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!mProcessed) {
          if (mFileLength > (long) Integer.MAX_VALUE) {
            throw new IOException("File is longer than Integer.MAX_VALUE.");
          }
          byte[] contents = new byte[(int) mFileLength];

          FileSystem fs = mFileToRead.getFileSystem(mConf);
          FSDataInputStream in = null;
          try {
            // Set the contents of this file.
            in = fs.open(mFileToRead);
            IOUtils.readFully(in, contents, 0, contents.length);
            mFileText.set(contents, 0, contents.length);

          } finally {
            IOUtils.closeStream(in);
          }
          mProcessed = true;
          return true;
        }
        return false;
      }

}

The following is your driver code:-

public int run(String[] arg) throws Exception {
    Configuration conf=getConf();
    FileSystem fs = FileSystem.get(conf);
    //estimate reducers
    Job job = new Job(conf);
    job.setJarByClass(WholeFileDriver.class);
    job.setJobName("WholeFile");
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setInputFormatClass(WholeFileInputFormat.class);
    job.setMapperClass(WholeFileMapper.class);
    job.setNumReduceTasks(0);

    FileInputFormat.addInputPath(job, new Path(arg[0]));
    Path output=new Path(arg[1]);
    try {
        fs.delete(output, true);
    } catch (IOException e) {
        LOG.warn("Failed to delete temporary path", e);
    }
    FileOutputFormat.setOutputPath(job, output);

    boolean ret=job.waitForCompletion(true);
    if(!ret){
        throw new Exception("Job Failed");
    }

这篇关于在hadoop中将文件读取为单个记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆