在Hadoop中使用WholeFileInputFormat MapReduce仍然会导致Mapper一次处理1行 [英] Using WholeFileInputFormat with Hadoop MapReduce still results in Mapper processing 1 line at a time
问题描述
使用Hadoop 2.6扩展我的标题,并且需要将整个文件发送到我的映射器,而不是一次一行。我在权威指南中关注了Tom Whites代码以创建WholeFileInputFormat和WholeFileRecordReader,但我的Mapper仍然一次处理1行文件。任何人都可以看到我在代码中缺少的东西吗?我使用了我能看到的书中的例子。任何指导将不胜感激。
WholeFileInputFormat.java
public类WholeFileInputFormat扩展FileInputFormat< NullWritable,BytesWritable> {
@Override
保护布尔isSplitable(JobContext上下文,路径文件){
return false;
}
@Override
public RecordReader< NullWritable,BytesWritable> createRecordReader(
InputSplit split,TaskAttemptContext context)throws IOException,InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split,context);
回报阅读器;
}
}
WholeFileRecordReader.java
public class WholeFileRecordReader扩展了RecordReader< NullWritable,BytesWritable> {
private FileSplit fileSplit;
私人配置conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
$ b @Override
public void initialize(InputSplit split,TaskAttemptContext context)throws IOException,InterruptedException {
this.fileSplit =(FileSplit)split;
this.conf = context.getConfiguration();
$ b @Override
public boolean nextKeyValue()throws IOException,InterruptedException {
if(!processed){
byte [] contents = new byte [(int)fileSplit.getLength()];
路径文件= fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
尝试{
in = fs.open(file);
IOUtils.readFully(in,contents,0,contents.length);
value.set(contents,0,contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
返回true;
}
返回false;
}
@Override
public NullWritable getCurrentKey()throws IOException,InterruptedException {
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue()抛出IOException,InterruptedException {
返回值;
}
@Override
public float getProgress()throws IOException {
return processed? 1.0f:0.0f;
$ b @Override $ b $ public void close()throws IOException {
// do nothing :)
}
$ c
$ b 以及我的Mapreduce的主要方法
public class ECCCount {
public static void main(String [] args)throws Exception {
if(args System.out.printf(Usage:ProcessLogs< input dir>< output dir> \\\
);
System.exit(-1);
// @ SuppressWarnings(deprecation)
Job job = new Job();
job.setJarByClass(ECCCount.class);
job.setJobName(ECCCount);
//FileInputFormat.setInputPaths(job,new Path(args [0]));
WholeFileInputFormat.setInputPaths(job,new Path(args [0]));
FileOutputFormat.setOutputPath(job,new Path(args [1]));
job.setMapperClass(ECCCountMapper.class);
job.setReducerClass(SumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
布尔成功= job.waitForCompletion(true);
System.exit(成功?0:1);
}
}
而我的Mapper是很好的衡量标准。现在它只是返回给定的值作为测试用例,以查看它是否返回一行或整个文件
public class ECCCountMapper扩展Mapper< LongWritable,Text,Text,IntWritable> {
@Override
public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException {
context.write(new Text(value),new IntWritable(1));
}
}
解决方案感谢Ramzy的输入,我发现我的错误,并能够通过以下更改传递整个文件
在我的主要方法我需要指定我需要使用的InputFormatClass。
job.setInputFormatClass(WholeFileInputFormat.class)
并且我的Mapper需要预期正确的类型作为输入
public class ECCCountMapper扩展了Mapper< NullWritable,BytesWritable,Text,IntWritable> {
这两个更改成功地将整个文件的一个字节[]发送到我的映射器,并根据需要对其进行处理。
To expand on my header in using Hadoop 2.6.. and need to send whole files to my mapper instead of a single line at a time. I have followed Tom Whites code in the Definitive Guide to create WholeFileInputFormat and WholeFileRecordReader but my Mapper is still processing files 1 line at a time. Can anyone see what I'm missing in my code? I used the book example exactly from what I can see. Any guidance will be much appreciated.
WholeFileInputFormat.java
public class WholeFileInputFormat extends FileInputFormat <NullWritable, BytesWritable>{
@Override
protected boolean isSplitable(JobContext context, Path file){
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}
WholeFileRecordReader.java
public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException{
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException{
if (!processed){
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try{
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
}finally{
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException{
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException{
return value;
}
@Override
public float getProgress() throws IOException {
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException{
//do nothing :)
}
}
And the main method for my Mapreduce
public class ECCCount {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf("Usage: ProcessLogs <input dir> <output dir>\n");
System.exit(-1);
}
//@SuppressWarnings("deprecation")
Job job = new Job();
job.setJarByClass(ECCCount.class);
job.setJobName("ECCCount");
//FileInputFormat.setInputPaths(job, new Path(args[0]));
WholeFileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(ECCCountMapper.class);
job.setReducerClass(SumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
And my Mapper for good measure. Right now it simply returns the value its given as a test case to see if its returning a line or whole file
public class ECCCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(new Text(value), new IntWritable(1));
}
}
解决方案 Thanks to Ramzy's input I found my error and was able to get the whole file passed with the following changes
In my main method I needed to specify the InputFormatClass I needed to use.
job.setInputFormatClass(WholeFileInputFormat.class)
and my Mapper needs to expect the correct types as input
public class ECCCountMapper extends Mapper<NullWritable, BytesWritable, Text, IntWritable>{
Those two changes successfully sent a byte[] of the entire file to my mapper where I manipulate it as needed.
这篇关于在Hadoop中使用WholeFileInputFormat MapReduce仍然会导致Mapper一次处理1行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!