在Hadoop中使用WholeFileInputFormat MapReduce仍然会导致Mapper一次处理1行 [英] Using WholeFileInputFormat with Hadoop MapReduce still results in Mapper processing 1 line at a time

查看:150
本文介绍了在Hadoop中使用WholeFileInputFormat MapReduce仍然会导致Mapper一次处理1行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用Hadoop 2.6扩展我的标题,并且需要将整个文件发送到我的映射器,而不是一次一行。我在权威指南中关注了Tom Whites代码以创建WholeFileInputFormat和WholeFileRecordReader,但我的Mapper仍然一次处理1行文件。任何人都可以看到我在代码中缺少的东西吗?我使用了我能看到的书中的例子。任何指导将不胜感激。



WholeFileInputFormat.java

  public类WholeFileInputFormat扩展FileInputFormat< NullWritable,BytesWritable> {

@Override
保护布尔isSplitable(JobContext上下文,路径文件){
return false;
}

@Override
public RecordReader< NullWritable,BytesWritable> createRecordReader(
InputSplit split,TaskAttemptContext context)throws IOException,InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split,context);
回报阅读器;
}

}

WholeFileRecordReader.java

  public class WholeFileRecordReader扩展了RecordReader< NullWritable,BytesWritable> {
private FileSplit fileSplit;
私人配置conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
$ b @Override
public void initialize(InputSplit split,TaskAttemptContext context)throws IOException,InterruptedException {
this.fileSplit =(FileSplit)split;
this.conf = context.getConfiguration();

$ b @Override
public boolean nextKeyValue()throws IOException,InterruptedException {
if(!processed){
byte [] contents = new byte [(int)fileSplit.getLength()];
路径文件= fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
尝试{
in = fs.open(file);
IOUtils.readFully(in,contents,0,contents.length);
value.set(contents,0,contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
返回true;
}
返回false;
}

@Override
public NullWritable getCurrentKey()throws IOException,InterruptedException {
return NullWritable.get();
}

@Override
public BytesWritable getCurrentValue()抛出IOException,InterruptedException {
返回值;
}

@Override
public float getProgress()throws IOException {
return processed? 1.0f:0.0f;

$ b @Override $ b $ public void close()throws IOException {
// do nothing :)
}



$ b

以及我的Mapreduce的主要方法

  public class ECCCount {
public static void main(String [] args)throws Exception {

if(args System.out.printf(Usage:ProcessLogs< input dir>< output dir> \\\
);
System.exit(-1);


// @ SuppressWarnings(deprecation)
Job job = new Job();
job.setJarByClass(ECCCount.class);
job.setJobName(ECCCount);

//FileInputFormat.setInputPaths(job,new Path(args [0]));
WholeFileInputFormat.setInputPaths(job,new Path(args [0]));
FileOutputFormat.setOutputPath(job,new Path(args [1]));

job.setMapperClass(ECCCountMapper.class);
job.setReducerClass(SumReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

布尔成功= job.waitForCompletion(true);
System.exit(成功?0:1);
}

}

而我的Mapper是很好的衡量标准。现在它只是返回给定的值作为测试用例,以查看它是否返回一行或整个文件

  public class ECCCountMapper扩展Mapper< LongWritable,Text,Text,IntWritable> {
@Override
public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException {

context.write(new Text(value),new IntWritable(1));
}

}

解决方案

感谢Ramzy的输入,我发现我的错误,并能够通过以下更改传递整个文件



在我的主要方法我需要指定我需要使用的InputFormatClass。

  job.setInputFormatClass(WholeFileInputFormat.class)

并且我的Mapper需要预期正确的类型作为输入

  public class ECCCountMapper扩展了Mapper< NullWritable,BytesWritable,Text,IntWritable> {

这两个更改成功地将整个文件的一个字节[]发送到我的映射器,并根据需要对其进行处理。


To expand on my header in using Hadoop 2.6.. and need to send whole files to my mapper instead of a single line at a time. I have followed Tom Whites code in the Definitive Guide to create WholeFileInputFormat and WholeFileRecordReader but my Mapper is still processing files 1 line at a time. Can anyone see what I'm missing in my code? I used the book example exactly from what I can see. Any guidance will be much appreciated.

WholeFileInputFormat.java

public class WholeFileInputFormat extends FileInputFormat <NullWritable, BytesWritable>{

@Override
protected boolean isSplitable(JobContext context, Path file){
    return false;
}

@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    WholeFileRecordReader reader = new WholeFileRecordReader();
    reader.initialize(split, context);
    return reader;
}

}

WholeFileRecordReader.java

public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException{
    this.fileSplit = (FileSplit) split;
    this.conf = context.getConfiguration();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException{
    if (!processed){
        byte[] contents = new byte[(int) fileSplit.getLength()];
        Path file = fileSplit.getPath();
        FileSystem fs = file.getFileSystem(conf);
        FSDataInputStream in = null;
        try{
            in = fs.open(file);
            IOUtils.readFully(in, contents, 0, contents.length);
            value.set(contents, 0, contents.length);
        }finally{
            IOUtils.closeStream(in);
        }
        processed = true;
        return  true;
    }
    return false;
}

@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException{
    return NullWritable.get();
}

@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException{
    return value;
}

@Override
public float getProgress() throws IOException {
    return processed ? 1.0f : 0.0f;
}

@Override
public void close() throws IOException{
    //do nothing :)
}

}

And the main method for my Mapreduce

public class ECCCount {
public static void main(String[] args) throws Exception {

    if (args.length != 2) {
      System.out.printf("Usage: ProcessLogs <input dir> <output dir>\n");
      System.exit(-1);
    }

    //@SuppressWarnings("deprecation")
    Job job = new Job();
    job.setJarByClass(ECCCount.class);
    job.setJobName("ECCCount");

    //FileInputFormat.setInputPaths(job, new Path(args[0]));
    WholeFileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setMapperClass(ECCCountMapper.class);
    job.setReducerClass(SumReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    boolean success = job.waitForCompletion(true);
    System.exit(success ? 0 : 1);
  }

}

And my Mapper for good measure. Right now it simply returns the value its given as a test case to see if its returning a line or whole file

public class ECCCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

      context.write(new Text(value), new IntWritable(1));
  }

}

解决方案

Thanks to Ramzy's input I found my error and was able to get the whole file passed with the following changes

In my main method I needed to specify the InputFormatClass I needed to use.

job.setInputFormatClass(WholeFileInputFormat.class)

and my Mapper needs to expect the correct types as input

public class ECCCountMapper extends Mapper<NullWritable, BytesWritable, Text, IntWritable>{

Those two changes successfully sent a byte[] of the entire file to my mapper where I manipulate it as needed.

这篇关于在Hadoop中使用WholeFileInputFormat MapReduce仍然会导致Mapper一次处理1行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆