使用mapreduce从HDFS读取图像 [英] Reading images from HDFS using mapreduce

查看:121
本文介绍了使用mapreduce从HDFS读取图像的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

请在此代码中帮助我。我正在尝试从HDFS重新引导图像。我正在使用WholeFileInputFormat。与WholeFileRecordreader。没有编译时错误。但是代码给出了运行时错误。
输出结果是:无法创建给定类WholeFileInputFormat的实例。
我已根据如何在map-reduce中读取多个图像文件作为hdfs的输入?
请在此代码中帮助我。它包含3类。如何调试它?或者任何其他方式?

  import java.awt.image.BufferedImage; 
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.imageio.ImageIO;
导入net.semanticmetadata.lire.imageanalysis.AutoColorCorrelogram;
导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io。*;
import org.apache.hadoop.mapred。*;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class map2 extends Configured implements Tool {


public static class MapClass extends MapReduceBase
implements Mapper< NullWritable,BytesWritable,Text,Text> {


私人文本input_image = new Text();
private文本input_vector = new Text();

$ b @覆盖
public void map(NullWritable key,BytesWritable value,
OutputCollector< Text,Text>输出,
Reporter记者)抛出IOException {

System.out.println(CorrelogramIndex Method:);
字符串featureString;
int MAXIMUM_DISTANCE = 16;
AutoColorCorrelogram.Mode mode = AutoColorCorrelogram.Mode.FullNeighbourhood;



byte [] identifier = value.getBytes();

BufferedImage bimg = ImageIO.read(new ByteArrayInputStream(identifier));

AutoColorCorrelogram vd = new AutoColorCorrelogram(MAXIMUM_DISTANCE,mode);

vd.extract(bimg);

featureString = vd.getStringRepresentation();
double [] bytearray = vd.getDoubleHistogram();

System.out.println(image:+ identifier ++ featureString);




System.out.println(-------------);


input_image.set(identifier);
input_vector.set(featureString);
output.collect(input_image,input_vector);








$ b public static class Reduce extends MapReduceBase
实现了Reducer<文本,文本,文本,文本> {

@Override
public void reduce(Text key,Iterator< Text> values,
OutputCollector< Text,Text> output,
Reporter reporter)throws IOException {
String out_vector =;

while(values.hasNext()){
out_vector + =(values.next()。toString());
}
output.collect(key,new Text(out_vector));



static int printUsage(){
System.out.println(map2 [-m< maps>] [-r< reduced> ;]< input>< output>);
ToolRunner.printGenericCommandUsage(System.out);
返回-1;


$ b @Override
public int run(String [] args)throws Exception {



JobConf conf = new JobConf(getConf(),map2.class);
conf.setJobName(image_mapreduce);

conf.setInputFormat(WholeFileInputFormat.class);
conf.setOutputFormat(NullOutputFormat.class);


conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(Text.class);

conf.setMapperClass(MapClass.class);

conf.setReducerClass(Reduce.class);






List< String> other_args = new ArrayList<>();
for(int i = 0; i< args.length; ++ i){
try {
switch(args [i]){
case-m :
conf.setNumMapTasks(Integer.parseInt(args [++ i]));
休息;
case-r:
conf.setNumReduceTasks(Integer.parseInt(args [++ i]));
休息;
默认值:
other_args.add(args [i]);
休息;
}
} catch(NumberFormatException除外){
System.out.println(错误:预期为整数而不是+ args [i]);
return printUsage();
} catch(ArrayIndexOutOfBoundsException except){
System.out.println(错误:必需的参数缺少来自
+ args [i - 1]);
return printUsage();
}
}

//确保剩下2个参数。
if(other_args.size()!= 2){
System.out.println(错误:错误的参数数量:
+ other_args.size()+而不是2 。);
return printUsage();
}




FileInputFormat.setInputPaths(conf,other_args.get(0));
FileOutputFormat.setOutputPath(conf,new Path(other_args.get(1)));
JobClient.runJob(conf);
返回0;

$ b $ public static void main(String [] args)throws Exception {
int res = ToolRunner.run(new Configuration(),new map2(),args);
System.exit(res);
}
}
------------------------------------ -----------------------------------------------
// WholeFileInputFormat

import java.io.IOException;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred。*;

public class WholeFileInputFormat< NullWritable,BytesWritable>
扩展FileInputFormat< NullWritable,BytesWritable> {

// @覆盖
保护布尔isSplitable(JobContext上下文,路径文件){
return false;
}
// @覆盖

public WholeFileRecordReader createRecordReader(
InputSplit split,TaskAttemptContext上下文)throws IOException,
InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split,context);
回报阅读器;
}




@覆盖
public RecordReader< NullWritable,BytesWritable> getRecordReader(InputSplit split,
JobConf作业,Reporter记者)
抛出IOException;
}

------------------------------------- ------------------------------------------
// WholeInputFileRecorder

import java.io.IOException;
导入org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.TaskAttemptContext;

类WholeFileRecordReader实现RecordReader< NullWritable,BytesWritable> {//记录读取器

私有FileSplit fileSplit;
私人配置conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;

public void initialize(InputSplit split,TaskAttemptContext context)
throws IOException,InterruptedException {
this.fileSplit =(FileSplit)split;
this.conf = context.getJobConf();

$ b @Override
public boolean next(NullWritable k,BytesWritable v)抛出IOException {
if(!processed){
byte [] contents = new byte [(int)fileSplit.getLength()];
路径文件= fileSplit.getPath();
org.apache.hadoop.fs.FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
尝试{
in = fs.open(file);
IOUtils.readFully(in,contents,0,contents.length);
value.set(contents,0,contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
返回true;
}
返回false;
}
@Override
public NullWritable createKey(){
return NullWritable.get();
}

@Override
public BytesWritable createValue(){
返回值;

$ b @Override
public long getPos()throws IOException {
throw new UnsupportedOperationException(Not supported yet。);

$ b @Override
public void close()throws IOException {
throw new UnsupportedOperationException(Not supported yet。);

$ b @Override
public float getProgress()throws IOException {
throw new UnsupportedOperationException(Not supported yet。);
}
}


解决方案

WholeFileInputFormat 被定义为抽象的,你想如何创建它的一个实例?

要么使它不是抽象的或用一个具体的实现对它进行子类化。


Please help me in this code. I am trying to reiad images from HDFS. I am using WholeFileInputFormat. with WholeFileRecordreader. No compile time errors.But the code is giving runtime errors. The output is saying: cannot create the instance of the given class WholeFileInputFormat. I have written this code according to the comments on How to read multiple image files as input from hdfs in map-reduce? Please help me in this code.It contains 3 classes.How to debug it? Or any other way?

import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.imageio.ImageIO;
import net.semanticmetadata.lire.imageanalysis.AutoColorCorrelogram;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class map2 extends Configured implements Tool {


    public static class MapClass extends MapReduceBase
            implements Mapper<NullWritable, BytesWritable, Text, Text> {


        private Text input_image = new Text();
        private Text input_vector = new Text();


        @Override
        public void map(NullWritable key,BytesWritable value,
                OutputCollector<Text, Text> output,
                Reporter reporter) throws IOException {

            System.out.println("CorrelogramIndex Method:");
        String featureString;
        int MAXIMUM_DISTANCE = 16;
        AutoColorCorrelogram.Mode mode = AutoColorCorrelogram.Mode.FullNeighbourhood;



        byte[] identifier=value.getBytes();

             BufferedImage bimg = ImageIO.read(new ByteArrayInputStream(identifier)); 

            AutoColorCorrelogram vd = new AutoColorCorrelogram(MAXIMUM_DISTANCE, mode);

            vd.extract(bimg);

            featureString = vd.getStringRepresentation();
            double[] bytearray = vd.getDoubleHistogram();

            System.out.println("image: " + identifier + " " + featureString);




            System.out.println(" ------------- ");


            input_image.set(identifier);
            input_vector.set(featureString);
            output.collect(input_image, input_vector);




        }
    }



    public static class Reduce extends MapReduceBase
            implements Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output,
                Reporter reporter) throws IOException {
            String out_vector = "";

            while (values.hasNext()) {
                out_vector += (values.next().toString());
            }
            output.collect(key, new Text(out_vector));
        }
    }

    static int printUsage() {
        System.out.println("map2 [-m <maps>] [-r <reduces>] <input> <output>");
        ToolRunner.printGenericCommandUsage(System.out);
        return -1;
    }


    @Override
    public int run(String[] args) throws Exception {



        JobConf conf = new JobConf(getConf(), map2.class);
        conf.setJobName("image_mapreduce");

            conf.setInputFormat(WholeFileInputFormat.class);
        conf.setOutputFormat(NullOutputFormat.class);


        conf.setOutputKeyClass(Text.class);

        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(MapClass.class);

        conf.setReducerClass(Reduce.class);






        List<String> other_args = new ArrayList<>();
        for (int i = 0; i < args.length; ++i) {
            try {
                switch (args[i]) {
                    case "-m":
                        conf.setNumMapTasks(Integer.parseInt(args[++i]));
                        break;
                    case "-r":
                        conf.setNumReduceTasks(Integer.parseInt(args[++i]));
                        break;
                    default:
                        other_args.add(args[i]);
                        break;
                }
            } catch (NumberFormatException except) {
                System.out.println("ERROR: Integer expected instead of " + args[i]);
                return printUsage();
            } catch (ArrayIndexOutOfBoundsException except) {
                System.out.println("ERROR: Required parameter missing from "
                        + args[i - 1]);
                return printUsage();
            }
        }

        // Make sure there are exactly 2 parameters left.
        if (other_args.size() != 2) {
            System.out.println("ERROR: Wrong number of parameters: "
                    + other_args.size() + " instead of 2.");
            return printUsage();
        }




        FileInputFormat.setInputPaths(conf, other_args.get(0));
        FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new map2(), args);
    System.exit(res);
    }
}
 -----------------------------------------------------------------------------------
//WholeFileInputFormat

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;

public class WholeFileInputFormat<NullWritable, BytesWritable> 
        extends FileInputFormat<NullWritable, BytesWritable> {

    //  @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }
    //@Override

    public WholeFileRecordReader createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException,
            InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader.initialize(split, context);
        return reader;
    }




 @Override
public RecordReader<NullWritable, BytesWritable> getRecordReader(InputSplit split,
   JobConf job, Reporter reporter)
   throws IOException;
}

    -------------------------------------------------------------------------------
   //WholeInputFileRecorder

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.TaskAttemptContext;

 class WholeFileRecordReader implements RecordReader<NullWritable, BytesWritable> {   //recordreader

    private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    public void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) split;
        this.conf = context.getJobConf();
    }

    @Override
    public boolean next(NullWritable k, BytesWritable v) throws IOException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            Path file = fileSplit.getPath();
            org.apache.hadoop.fs.FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try {
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } finally {
                IOUtils.closeStream(in);
            }
            processed = true;
            return true;
        }
        return false;
    }
@Override
    public NullWritable createKey() {
    return NullWritable.get();
    }

    @Override
    public BytesWritable createValue() {
    return value;
    }

 @Override
 public long getPos() throws IOException {
     throw new UnsupportedOperationException("Not supported yet.");
 }

 @Override
 public void close() throws IOException {
     throw new UnsupportedOperationException("Not supported yet.");
 }

 @Override
 public float getProgress() throws IOException {
    throw new UnsupportedOperationException("Not supported yet.");
 }
    }

解决方案

WholeFileInputFormat is defined as abstract, how do you want to create an instance of it?

Either make it not abstract or subclass it with a concrete implementation.

这篇关于使用mapreduce从HDFS读取图像的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆