CombineFileInputFormat Hadoop 0.20.205的实现 [英] Implementation for CombineFileInputFormat Hadoop 0.20.205
问题描述
有人能指出我在哪里可以找到 CombineFileInputFormat
的实现(组织使用Hadoop 0.20.205),这是为了从非常小的日志文件
Can someone please point out where I could find an implementation for CombineFileInputFormat
(org. using Hadoop 0.20.205? this is to create large splits from very small log files (text in lines) using EMR.
令人惊讶的是,Hadoop没有专门为此设计的类的默认实现,并且使用Google搜索看起来像我不是唯一让我困惑的是,我需要编译这个类并将它捆绑到一个用于hadoop流的jar中,但对Java的了解有限,这是一个挑战。
It is surprising that Hadoop does not have a default implementation for this class made specifically for this purpose and googling it looks like I'm not the only one confused by this. I need to compile the class and bundle it in a jar for hadoop-streaming, with a limited knowledge of Java this is some challenge.
编辑:
我已经尝试了yetitrails的例子,有必要的导入,但是我得到了下一个方法的编译器错误。
I already tried the yetitrails example, with the necessary imports but I get a compiler error for the next method.
推荐答案
Here is an implementation I have for you:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
@SuppressWarnings("deprecation")
public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException {
return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class);
}
public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> {
private final LineRecordReader linerecord;
public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException {
FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations());
linerecord = new LineRecordReader(conf, filesplit);
}
@Override
public void close() throws IOException {
linerecord.close();
}
@Override
public LongWritable createKey() {
// TODO Auto-generated method stub
return linerecord.createKey();
}
@Override
public Text createValue() {
// TODO Auto-generated method stub
return linerecord.createValue();
}
@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
return linerecord.getPos();
}
@Override
public float getProgress() throws IOException {
// TODO Auto-generated method stub
return linerecord.getProgress();
}
@Override
public boolean next(LongWritable key, Text value) throws IOException {
// TODO Auto-generated method stub
return linerecord.next(key, value);
}
}
}
根据您希望将输入文件合并到的大小,首先设置参数 mapred.max.split.size
。在您的 run()中执行如下操作:
In your job first set the parameter mapred.max.split.size
according to the size you would like the input files to be combined into. Do something like follows in your run():
...
if (argument != null) {
conf.set("mapred.max.split.size", argument);
} else {
conf.set("mapred.max.split.size", "134217728"); // 128 MB
}
...
conf.setInputFormat(CombinedInputFormat.class);
...
这篇关于CombineFileInputFormat Hadoop 0.20.205的实现的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!