在运行Hadoop MapReduce作业时,将Filename / FileData作为Map的键/值输入 [英] Getting Filename/FileData as key/value input for Map when running a Hadoop MapReduce Job

查看:75
本文介绍了在运行Hadoop MapReduce作业时,将Filename / FileData作为Map的键/值输入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我经历了如何在运行Hadoop MapReduce作业时将文件名/文件内容作为MAP的键/值输入?。虽然它解释了这个概念,但我无法将它成功转换为代码。



基本上,我希望文件名作为键,文件数据作为值。为此,我按照上述问题中的建议编写了一个自定义 RecordReader 。但我无法理解如何将文件名称作为此课程的关键。另外,在编写自定义 FileInputFormat 类时,我无法理解如何返回之前编写的自定义 RecordReader

RecordReader 代码是:

  import java.io.IOException; 
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class CustomRecordReader extends RecordReader< Text,Text> {

private static final String LINE_SEPARATOR = System.getProperty(line.separator);

private StringBuffer valueBuffer = new StringBuffer();
private Text key = new Text();
私人文本值= new Text();
私人记录阅读器<文本,文本> recordReader;

public SPDRecordReader(RecordReader< Text,Text> recordReader){
this.recordReader = recordReader;
}

@Override
public void close()throws IOException {
recordReader.close();
}

@Override
public Text getCurrentKey()throws IOException,InterruptedException {
return key;
}

@Override
public Text getCurrentValue()throws IOException,InterruptedException {
返回值;

$ b @Override
public float getProgress()throws IOException,InterruptedException {
return recordReader.getProgress();

$ b @Override
public void initialize(InputSplit arg0,TaskAttemptContext arg1)
throws IOException,InterruptedException {
recordReader.initialize(arg0,arg1) ;

$ b @Override
public boolean nextKeyValue()throws IOException,InterruptedException {

if(valueBuffer.equals()){
while(recordReader.nextKeyValue()){
valueBuffer.append(recordReader.getCurrentValue());
valueBuffer.append(LINE_SEPARATOR);
}
value.set(valueBuffer.toString());
返回true;
}
返回false;
}

}

并且不完整的 FileInputFormat class是:

  import java.io.IOException; 
导入org.apache.hadoop.fs.FileSystem;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

公共类CustomFileInputFormat扩展FileInputFormat<文本,文本> {

@Override
protected boolean isSplitable(FileSystem fs,Path filename){
return false;
}

@Override
public RecordReader< Text,Text> getRecordReader(InputSplit arg0,JobConf arg1,
Reporter arg2)抛出IOException {
return null;
}
}


解决方案

这个代码放在 CustomRecordReader 类中。

 专用LineRecordReader lineReader; 

private String fileName;

public CustomRecordReader(JobConf job,FileSplit split)throws IOException {
lineReader = new LineRecordReader(job,split);
fileName = split.getPath()。getName();

$ b public boolean next(Text key,Text value)throws IOException {
//获取下一行
if(!lineReader.next(key,value )){
return false;
}

key.set(fileName);
value.set(value);

返回true;
}

public text createKey(){
return new Text();
}

public Text createValue(){
return new Text();

删除 SPDRecordReader 构造函数(这是一个错误)。

在你的 CustomFileInputFormat class

  public RecordReader< Text,Text> getRecordReader(
InputSplit输入,JobConf作业,Reporter记者)
抛出IOException {

reporter.setStatus(input.toString());
返回新的CustomRecordReader(job,(FileSplit)input);
}


I went through the question How to get Filename/File Contents as key/value input for MAP when running a Hadoop MapReduce Job? here. Though it explains the concept, I am unable to successfully transform it to code.

Basically, I want the file name as key and the file data as value. For that I wrote a custom RecordReader as recommended in the aforementioned question. But I couldn't understand how to get the file name as the key in this class. Also, while writing the custom FileInputFormat class, I couldn't understand how to return the custom RecordReader I wrote previously.

The RecordReader code is:

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class CustomRecordReader extends RecordReader<Text, Text> {

    private static final String LINE_SEPARATOR = System.getProperty("line.separator");

    private StringBuffer valueBuffer = new StringBuffer("");
    private Text key = new Text();
    private Text value = new Text();
    private RecordReader<Text, Text> recordReader;

    public SPDRecordReader(RecordReader<Text, Text> recordReader) {
        this.recordReader = recordReader;
    }

    @Override
    public void close() throws IOException {
        recordReader.close();
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return recordReader.getProgress();
    }

    @Override
    public void initialize(InputSplit arg0, TaskAttemptContext arg1)
            throws IOException, InterruptedException {
        recordReader.initialize(arg0, arg1);
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        if (valueBuffer.equals("")) {
            while (recordReader.nextKeyValue()) {
                valueBuffer.append(recordReader.getCurrentValue());
                valueBuffer.append(LINE_SEPARATOR);
            }
            value.set(valueBuffer.toString());
            return true;
        }
        return false;
    }

}

And the incomplete FileInputFormat class is:

import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class CustomFileInputFormat extends FileInputFormat<Text, Text> {

    @Override
    protected boolean isSplitable(FileSystem fs, Path filename) {
        return false;
    }

    @Override
    public RecordReader<Text, Text> getRecordReader(InputSplit arg0, JobConf arg1,
            Reporter arg2) throws IOException {
        return null;
    }
}

解决方案

Have this code in your CustomRecordReader class.

private LineRecordReader lineReader;

private String fileName;

public CustomRecordReader(JobConf job, FileSplit split) throws IOException {
    lineReader = new LineRecordReader(job, split);
    fileName = split.getPath().getName();
}

public boolean next(Text key, Text value) throws IOException {
    // get the next line
    if (!lineReader.next(key, value)) {
        return false;
    }    

    key.set(fileName);
    value.set(value);

    return true;
}

public Text createKey() {
    return new Text("");
}

public Text createValue() {
    return new Text("");
}

Remove SPDRecordReader constructor (It is an error).

And have this code in your CustomFileInputFormat class

public RecordReader<Text, Text> getRecordReader(
  InputSplit input, JobConf job, Reporter reporter)
  throws IOException {

    reporter.setStatus(input.toString());
    return new CustomRecordReader(job, (FileSplit)input);
}

这篇关于在运行Hadoop MapReduce作业时,将Filename / FileData作为Map的键/值输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆