使用mapreduce从HDFS读取图像 [英] Reading images from HDFS using mapreduce
问题描述
请在此代码中帮助我。我正在尝试从HDFS重新引导图像。我正在使用WholeFileInputFormat。与WholeFileRecordreader。没有编译时错误。但是代码给出了运行时错误。
输出结果是:无法创建给定类WholeFileInputFormat的实例。
我已根据如何在map-reduce中读取多个图像文件作为hdfs的输入?
请在此代码中帮助我。它包含3类。如何调试它?或者任何其他方式?
import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.imageio.ImageIO;
导入net.semanticmetadata.lire.imageanalysis.AutoColorCorrelogram;
导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io。*;
import org.apache.hadoop.mapred。*;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class map2 extends Configured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper< NullWritable,BytesWritable,Text,Text> {
私人文本input_image = new Text();
private文本input_vector = new Text();
$ b @覆盖
public void map(NullWritable key,BytesWritable value,
OutputCollector< Text,Text>输出,
Reporter记者)抛出IOException {
System.out.println(CorrelogramIndex Method:);
字符串featureString;
int MAXIMUM_DISTANCE = 16;
AutoColorCorrelogram.Mode mode = AutoColorCorrelogram.Mode.FullNeighbourhood;
byte [] identifier = value.getBytes();
BufferedImage bimg = ImageIO.read(new ByteArrayInputStream(identifier));
AutoColorCorrelogram vd = new AutoColorCorrelogram(MAXIMUM_DISTANCE,mode);
vd.extract(bimg);
featureString = vd.getStringRepresentation();
double [] bytearray = vd.getDoubleHistogram();
System.out.println(image:+ identifier ++ featureString);
System.out.println(-------------);
input_image.set(identifier);
input_vector.set(featureString);
output.collect(input_image,input_vector);
$ b public static class Reduce extends MapReduceBase
实现了Reducer<文本,文本,文本,文本> {
@Override
public void reduce(Text key,Iterator< Text> values,
OutputCollector< Text,Text> output,
Reporter reporter)throws IOException {
String out_vector =;
while(values.hasNext()){
out_vector + =(values.next()。toString());
}
output.collect(key,new Text(out_vector));
static int printUsage(){
System.out.println(map2 [-m< maps>] [-r< reduced> ;]< input>< output>);
ToolRunner.printGenericCommandUsage(System.out);
返回-1;
$ b @Override
public int run(String [] args)throws Exception {
JobConf conf = new JobConf(getConf(),map2.class);
conf.setJobName(image_mapreduce);
conf.setInputFormat(WholeFileInputFormat.class);
conf.setOutputFormat(NullOutputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(MapClass.class);
conf.setReducerClass(Reduce.class);
List< String> other_args = new ArrayList<>();
for(int i = 0; i< args.length; ++ i){
try {
switch(args [i]){
case-m :
conf.setNumMapTasks(Integer.parseInt(args [++ i]));
休息;
case-r:
conf.setNumReduceTasks(Integer.parseInt(args [++ i]));
休息;
默认值:
other_args.add(args [i]);
休息;
}
} catch(NumberFormatException除外){
System.out.println(错误:预期为整数而不是+ args [i]);
return printUsage();
} catch(ArrayIndexOutOfBoundsException except){
System.out.println(错误:必需的参数缺少来自
+ args [i - 1]);
return printUsage();
}
}
//确保剩下2个参数。
if(other_args.size()!= 2){
System.out.println(错误:错误的参数数量:
+ other_args.size()+而不是2 。);
return printUsage();
}
FileInputFormat.setInputPaths(conf,other_args.get(0));
FileOutputFormat.setOutputPath(conf,new Path(other_args.get(1)));
JobClient.runJob(conf);
返回0;
$ b $ public static void main(String [] args)throws Exception {
int res = ToolRunner.run(new Configuration(),new map2(),args);
System.exit(res);
}
}
------------------------------------ -----------------------------------------------
// WholeFileInputFormat
import java.io.IOException;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred。*;
public class WholeFileInputFormat< NullWritable,BytesWritable>
扩展FileInputFormat< NullWritable,BytesWritable> {
// @覆盖
保护布尔isSplitable(JobContext上下文,路径文件){
return false;
}
// @覆盖
public WholeFileRecordReader createRecordReader(
InputSplit split,TaskAttemptContext上下文)throws IOException,
InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split,context);
回报阅读器;
}
@覆盖
public RecordReader< NullWritable,BytesWritable> getRecordReader(InputSplit split,
JobConf作业,Reporter记者)
抛出IOException;
}
------------------------------------- ------------------------------------------
// WholeInputFileRecorder
import java.io.IOException;
导入org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.TaskAttemptContext;
类WholeFileRecordReader实现RecordReader< NullWritable,BytesWritable> {//记录读取器
私有FileSplit fileSplit;
私人配置conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
public void initialize(InputSplit split,TaskAttemptContext context)
throws IOException,InterruptedException {
this.fileSplit =(FileSplit)split;
this.conf = context.getJobConf();
$ b @Override
public boolean next(NullWritable k,BytesWritable v)抛出IOException {
if(!processed){
byte [] contents = new byte [(int)fileSplit.getLength()];
路径文件= fileSplit.getPath();
org.apache.hadoop.fs.FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
尝试{
in = fs.open(file);
IOUtils.readFully(in,contents,0,contents.length);
value.set(contents,0,contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
返回true;
}
返回false;
}
@Override
public NullWritable createKey(){
return NullWritable.get();
}
@Override
public BytesWritable createValue(){
返回值;
$ b @Override
public long getPos()throws IOException {
throw new UnsupportedOperationException(Not supported yet。);
$ b @Override
public void close()throws IOException {
throw new UnsupportedOperationException(Not supported yet。);
$ b @Override
public float getProgress()throws IOException {
throw new UnsupportedOperationException(Not supported yet。);
}
}
WholeFileInputFormat
被定义为抽象的,你想如何创建它的一个实例?
要么使它不是抽象的或用一个具体的实现对它进行子类化。
Please help me in this code. I am trying to reiad images from HDFS. I am using WholeFileInputFormat. with WholeFileRecordreader. No compile time errors.But the code is giving runtime errors. The output is saying: cannot create the instance of the given class WholeFileInputFormat. I have written this code according to the comments on How to read multiple image files as input from hdfs in map-reduce? Please help me in this code.It contains 3 classes.How to debug it? Or any other way?
import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.imageio.ImageIO;
import net.semanticmetadata.lire.imageanalysis.AutoColorCorrelogram;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class map2 extends Configured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper<NullWritable, BytesWritable, Text, Text> {
private Text input_image = new Text();
private Text input_vector = new Text();
@Override
public void map(NullWritable key,BytesWritable value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
System.out.println("CorrelogramIndex Method:");
String featureString;
int MAXIMUM_DISTANCE = 16;
AutoColorCorrelogram.Mode mode = AutoColorCorrelogram.Mode.FullNeighbourhood;
byte[] identifier=value.getBytes();
BufferedImage bimg = ImageIO.read(new ByteArrayInputStream(identifier));
AutoColorCorrelogram vd = new AutoColorCorrelogram(MAXIMUM_DISTANCE, mode);
vd.extract(bimg);
featureString = vd.getStringRepresentation();
double[] bytearray = vd.getDoubleHistogram();
System.out.println("image: " + identifier + " " + featureString);
System.out.println(" ------------- ");
input_image.set(identifier);
input_vector.set(featureString);
output.collect(input_image, input_vector);
}
}
public static class Reduce extends MapReduceBase
implements Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String out_vector = "";
while (values.hasNext()) {
out_vector += (values.next().toString());
}
output.collect(key, new Text(out_vector));
}
}
static int printUsage() {
System.out.println("map2 [-m <maps>] [-r <reduces>] <input> <output>");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
@Override
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), map2.class);
conf.setJobName("image_mapreduce");
conf.setInputFormat(WholeFileInputFormat.class);
conf.setOutputFormat(NullOutputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(MapClass.class);
conf.setReducerClass(Reduce.class);
List<String> other_args = new ArrayList<>();
for (int i = 0; i < args.length; ++i) {
try {
switch (args[i]) {
case "-m":
conf.setNumMapTasks(Integer.parseInt(args[++i]));
break;
case "-r":
conf.setNumReduceTasks(Integer.parseInt(args[++i]));
break;
default:
other_args.add(args[i]);
break;
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args[i]);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from "
+ args[i - 1]);
return printUsage();
}
}
// Make sure there are exactly 2 parameters left.
if (other_args.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: "
+ other_args.size() + " instead of 2.");
return printUsage();
}
FileInputFormat.setInputPaths(conf, other_args.get(0));
FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new map2(), args);
System.exit(res);
}
}
-----------------------------------------------------------------------------------
//WholeFileInputFormat
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;
public class WholeFileInputFormat<NullWritable, BytesWritable>
extends FileInputFormat<NullWritable, BytesWritable> {
// @Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
//@Override
public WholeFileRecordReader createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
@Override
public RecordReader<NullWritable, BytesWritable> getRecordReader(InputSplit split,
JobConf job, Reporter reporter)
throws IOException;
}
-------------------------------------------------------------------------------
//WholeInputFileRecorder
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.TaskAttemptContext;
class WholeFileRecordReader implements RecordReader<NullWritable, BytesWritable> { //recordreader
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getJobConf();
}
@Override
public boolean next(NullWritable k, BytesWritable v) throws IOException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
org.apache.hadoop.fs.FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable createKey() {
return NullWritable.get();
}
@Override
public BytesWritable createValue() {
return value;
}
@Override
public long getPos() throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void close() throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public float getProgress() throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
}
WholeFileInputFormat
is defined as abstract, how do you want to create an instance of it?
Either make it not abstract or subclass it with a concrete implementation.
这篇关于使用mapreduce从HDFS读取图像的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!