使用 mapreduce 从 HDFS 读取图像 [英] Reading images from HDFS using mapreduce

查看:16
本文介绍了使用 mapreduce 从 HDFS 读取图像的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

请在此代码中帮助我.我正在尝试从 HDFS 读取图像.我正在使用 WholeFileInputFormat.与 WholeFileRecordreader.没有编译时错误.但是代码给出了运行时错误.输出是说:无法创建给定类 WholeFileInputFormat 的实例.我已经根据 如何在map-reduce中从hdfs读取多个图像文件作为输入?请在这段代码中帮助我.它包含 3 个类.如何调试它?还是其他方式?

Please help me in this code. I am trying to reiad images from HDFS. I am using WholeFileInputFormat. with WholeFileRecordreader. No compile time errors.But the code is giving runtime errors. The output is saying: cannot create the instance of the given class WholeFileInputFormat. I have written this code according to the comments on How to read multiple image files as input from hdfs in map-reduce? Please help me in this code.It contains 3 classes.How to debug it? Or any other way?

import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.imageio.ImageIO;
import net.semanticmetadata.lire.imageanalysis.AutoColorCorrelogram;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class map2 extends Configured implements Tool {


    public static class MapClass extends MapReduceBase
            implements Mapper<NullWritable, BytesWritable, Text, Text> {


        private Text input_image = new Text();
        private Text input_vector = new Text();


        @Override
        public void map(NullWritable key,BytesWritable value,
                OutputCollector<Text, Text> output,
                Reporter reporter) throws IOException {

            System.out.println("CorrelogramIndex Method:");
        String featureString;
        int MAXIMUM_DISTANCE = 16;
        AutoColorCorrelogram.Mode mode = AutoColorCorrelogram.Mode.FullNeighbourhood;



        byte[] identifier=value.getBytes();

             BufferedImage bimg = ImageIO.read(new ByteArrayInputStream(identifier)); 

            AutoColorCorrelogram vd = new AutoColorCorrelogram(MAXIMUM_DISTANCE, mode);

            vd.extract(bimg);

            featureString = vd.getStringRepresentation();
            double[] bytearray = vd.getDoubleHistogram();

            System.out.println("image: " + identifier + " " + featureString);




            System.out.println(" ------------- ");


            input_image.set(identifier);
            input_vector.set(featureString);
            output.collect(input_image, input_vector);




        }
    }



    public static class Reduce extends MapReduceBase
            implements Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output,
                Reporter reporter) throws IOException {
            String out_vector = "";

            while (values.hasNext()) {
                out_vector += (values.next().toString());
            }
            output.collect(key, new Text(out_vector));
        }
    }

    static int printUsage() {
        System.out.println("map2 [-m <maps>] [-r <reduces>] <input> <output>");
        ToolRunner.printGenericCommandUsage(System.out);
        return -1;
    }


    @Override
    public int run(String[] args) throws Exception {



        JobConf conf = new JobConf(getConf(), map2.class);
        conf.setJobName("image_mapreduce");

            conf.setInputFormat(WholeFileInputFormat.class);
        conf.setOutputFormat(NullOutputFormat.class);


        conf.setOutputKeyClass(Text.class);

        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(MapClass.class);

        conf.setReducerClass(Reduce.class);






        List<String> other_args = new ArrayList<>();
        for (int i = 0; i < args.length; ++i) {
            try {
                switch (args[i]) {
                    case "-m":
                        conf.setNumMapTasks(Integer.parseInt(args[++i]));
                        break;
                    case "-r":
                        conf.setNumReduceTasks(Integer.parseInt(args[++i]));
                        break;
                    default:
                        other_args.add(args[i]);
                        break;
                }
            } catch (NumberFormatException except) {
                System.out.println("ERROR: Integer expected instead of " + args[i]);
                return printUsage();
            } catch (ArrayIndexOutOfBoundsException except) {
                System.out.println("ERROR: Required parameter missing from "
                        + args[i - 1]);
                return printUsage();
            }
        }

        // Make sure there are exactly 2 parameters left.
        if (other_args.size() != 2) {
            System.out.println("ERROR: Wrong number of parameters: "
                    + other_args.size() + " instead of 2.");
            return printUsage();
        }




        FileInputFormat.setInputPaths(conf, other_args.get(0));
        FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new map2(), args);
    System.exit(res);
    }
}
 -----------------------------------------------------------------------------------
//WholeFileInputFormat

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;

public class WholeFileInputFormat<NullWritable, BytesWritable> 
        extends FileInputFormat<NullWritable, BytesWritable> {

    //  @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }
    //@Override

    public WholeFileRecordReader createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException,
            InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader.initialize(split, context);
        return reader;
    }




 @Override
public RecordReader<NullWritable, BytesWritable> getRecordReader(InputSplit split,
   JobConf job, Reporter reporter)
   throws IOException;
}

    -------------------------------------------------------------------------------
   //WholeInputFileRecorder

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.TaskAttemptContext;

 class WholeFileRecordReader implements RecordReader<NullWritable, BytesWritable> {   //recordreader

    private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    public void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) split;
        this.conf = context.getJobConf();
    }

    @Override
    public boolean next(NullWritable k, BytesWritable v) throws IOException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            Path file = fileSplit.getPath();
            org.apache.hadoop.fs.FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try {
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } finally {
                IOUtils.closeStream(in);
            }
            processed = true;
            return true;
        }
        return false;
    }
@Override
    public NullWritable createKey() {
    return NullWritable.get();
    }

    @Override
    public BytesWritable createValue() {
    return value;
    }

 @Override
 public long getPos() throws IOException {
     throw new UnsupportedOperationException("Not supported yet.");
 }

 @Override
 public void close() throws IOException {
     throw new UnsupportedOperationException("Not supported yet.");
 }

 @Override
 public float getProgress() throws IOException {
    throw new UnsupportedOperationException("Not supported yet.");
 }
    }

推荐答案

WholeFileInputFormat被定义为抽象,你想如何创建它的实例?

WholeFileInputFormat is defined as abstract, how do you want to create an instance of it?

要么让它不抽象,要么用一个具体的实现子类化它.

Either make it not abstract or subclass it with a concrete implementation.

这篇关于使用 mapreduce 从 HDFS 读取图像的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆