Hadoop:实现自定义FileInputFormat类时需要帮助 [英] Hadoop: Help needed while implementing a custom FileInputFormat-class

查看:161
本文介绍了Hadoop:实现自定义FileInputFormat类时需要帮助的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图用hadoop实现一些Map / Reduce作业来完成大学作业。但目前我完全停留在实现自定义FileInputFormat类以从文件中将整个内容导入到映射器中。



我把这个例子从hadoop:权威指南中拿出来,没有任何变化。我可以编译我的源代码,但如果我运行它,它会抛出这个异常(目前我在debian 5.0上使用hadoop 1.0.2)
$ b

 线程main中的异常java.lang.RuntimeException:java.lang.NoSuchMethodException:org.myorg.ExampleFileInputFormat $ WholeFileInputFormat。< init>()
at org.apache.hadoop.util。 ReflectionUtils.newInstance(ReflectionUtils.java:115)
at org.apache.hadoop.mapred.JobConf.getInputFormat(JobConf.java:575)
at org.apache.hadoop.mapred.JobClient.writeOldSplits( JobClient.java:989)
在org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:981)
在org.apache.hadoop.mapred.JobClient.access $ 600(JobClient.java :174)
在org.apache.hadoop.mapred.JobClient $ 2.run(JobClient.java:897)
在org.apache.hadoop.mapred.JobClient $ 2.run(JobClient.java:850 )
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
在org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093)
在org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850)
在org.apache .hadoop.mapred.JobClient.submitJob(JobClient.java:824)
位于org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1261)
位于org.myorg.ExampleFileInputFormat.run (ExampleFileInputFormat.java:163)
在org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
在org.apache.hadoop.util.ToolRunner.run(ToolRunner.java :79)
at org.myorg.ExampleFileInputFormat.main(ExampleFileInputFormat.java:172)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
在org.apache.hadoop.util.RunJar.mai n(RunJar.java:156)
导致:java.lang.NoSuchMethodException:org.myorg.ExampleFileInputFormat $ WholeFileInputFormat。< init>()$ b $在java.lang.Class.getConstructor0(Class。 java:2706)
at java.lang.Class.getDeclaredConstructor(Class.java:1985)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:109)
... 21更多

我有点沮丧,因为我不明白发生了什么,我使用网络搜索找不到任何东西。也许你们中的一些人可以看看我的来源。这是目前调试的目的。






 包org.myorg; 
/ *
*
*
* /
import java.io.IOException;
import java.util。*;
import java.io. *;
import java.util.regex.Pattern;
import java.util.regex.Matcher;

导入org.apache.hadoop.fs。*;
导入org.apache.hadoop.fs.FileSystem;
导入org.apache.hadoop.fs.Path;
导入org.apache.hadoop.conf。*;
导入org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io。*;
import org.apache.hadoop.mapred。*;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.util。*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
导入org.apache.hadoop.conf.Configured;


public class ExampleFileInputFormat extends Configured implements工具{

/ *
*< generics>
* /
public class WholeFileInputFormat扩展FileInputFormat< NullWritable,BytesWritable> {

@Override
protected boolean isSplitable(FileSystem fs,Path filename){
return false;
}

@Override
public RecordReader< NullWritable,BytesWritable> getRecordReader(InputSplit split,JobConf作业,Reporter记者)抛出IOException {
返回新的WholeFileRecordReader((FileSplit)split,job);
}
}

public class WholeFileRecordReader实现了RecordReader< NullWritable,BytesWritable> {

private FileSplit fileSplit;
私人配置conf;
private boolean processed = false;

public WholeFileRecordReader(FileSplit fileSplit,Configuration conf)抛出IOException {
this.fileSplit = fileSplit;
this.conf = conf;
}

@Override
public NullWritable createKey(){
return NullWritable.get();
}

@Override
public BytesWritable createValue(){
return new BytesWritable();
}

@Override
public long getPos()throws IOException {
return processed? fileSplit.getLength():0;
}

@Override
public float getProgress()throws IOException {
return processed? 1.0f:0.0f;

$ b @Override
public boolean next(NullWritable key,BytesWritable value)throws IOException {
if(!processed){
byte [] contents = new byte [(int)fileSplit.getLength()];
路径文件= fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
尝试{
in = fs.open(file);
IOUtils.readFully(in,contents,0,contents.length);
value.set(contents,0,contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
返回true;
}
返回false;
}

@Override
public void close()throws IOException {
//什么也不做
}
}
/ *< / generics> * /

$ b / *
*< Task1> ;:
* * /
public static class ExampleMap扩展MapReduceBase实现Mapper< NullWritable,BytesWritable,文字,IntWritable> {
private static static IntWritable one = new IntWritable(1);
私人文字=新文字();
public void map(NullWritable key,BytesWritable value,OutputCollector< Text,IntWritable> output,Reporter reporter)抛出IOException {
output.collect(new Text(test),one);


public static class ExampleReduce extends MapReduceBase implements Reducer< Text,IntWritable,Text,IntWritable> {
public void reduce(Text key,Iterator< IntWritable> values,OutputCollector< Text,IntWritable> output,Reporter reporter)throws IOException {
int sum = 0;
while(values.hasNext()){
sum + = values.next()。get();
}
output.collect(key,new IntWritable(sum));
}
}
/ *< / Task1> * /
/ *
*< run>
** /
public int run(String [] args)throws Exception {

if(args.length!= 3){
printUsage();
返回1;
}

String useCase = args [0];
String inputPath = args [1];
String outputPath = args [2];

deleteOldOutput(outputPath);


JobConf conf = new JobConf(ExampleFileInputFormat.class);
FileOutputFormat.setOutputPath(conf,new Path(outputPath));
FileInputFormat.setInputPaths(conf,new Path(inputPath));
$ b $ / * conf:Task1 * /
if(useCase.equals(cc_p)){
conf.setJobName(WordCount);
/ *输出:密钥:文本 - >值:整数* /
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setOutputFormat(TextOutputFormat.class);
/ *输入:Key.Text - >值:文本* /
conf.setInputFormat(WholeFileInputFormat.class);
conf.setMapperClass(ExampleMap.class);
conf.setReducerClass(ExampleReduce.class);
}
/ * default-option:退出* /
else {
printUsage();
返回1;
}

JobClient.runJob(conf);
返回0;
}
/ *< / run> * /

/ *
*< Main>
* /
public static void main(String [] args)throws Exception {
int res = ToolRunner.run(new ExampleFileInputFormat(),args);
System.exit(res);
}
/ *< / Main> * /

/ *
*< Helper>
* /
private void printUsage(){
System.out.println(usage:[usecase] [input-path] [output-path]);
return;

$ b private void deleteOldOutput(String outputPath)throws IOException {
//删除输出目录(如果它已经存在)
Path outputDir = new Path(outputPath);
FileSystem.get(getConf())。delete(outputDir,true);
}
/ *< / Helper-> * /
}






任何人都可以提供帮助我出去了吗?

来自德国的问候,
Alex

解决方案

你需要使内部类成为静态的:

  public static class WholeFileInputFormat extends FileInputFormat< NullWritable,BytesWritable> {

否则,Javac会生成一个构造函数,该构造函数需要传入父ExampleFileInputFormat类实例。


i'm trying to implement some Map/Reduce job with hadoop for a college assignment. but at the moment i'm totally stuck while implementing a custom FileInputFormat class to get the whole content from a file into my mapper.

i took the example out of "hadoop: the definitive guide" without no changes. i can compile my source code but if i run it it will throw this exception (at the moment i use hadoop 1.0.2 on a debian 5.0)

Exception in thread "main" java.lang.RuntimeException: java.lang.NoSuchMethodException: org.myorg.ExampleFileInputFormat$WholeFileInputFormat.<init>()
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)
    at org.apache.hadoop.mapred.JobConf.getInputFormat(JobConf.java:575)
    at org.apache.hadoop.mapred.JobClient.writeOldSplits(JobClient.java:989)
    at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:981)
    at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:174)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:897)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850)
    at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:824)
    at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1261)
    at org.myorg.ExampleFileInputFormat.run(ExampleFileInputFormat.java:163)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
    at org.myorg.ExampleFileInputFormat.main(ExampleFileInputFormat.java:172)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: java.lang.NoSuchMethodException: org.myorg.ExampleFileInputFormat$WholeFileInputFormat.<init>()
    at java.lang.Class.getConstructor0(Class.java:2706)
    at java.lang.Class.getDeclaredConstructor(Class.java:1985)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:109)
    ... 21 more

i'm kind of frustrated because i don't understand what happens and i don't find anything using web search. Maybe some of you could take a look on my source. it's kind of stripped down at the moment for the purpose of debugging.


package org.myorg;
/*
 * 
 * 
 */
import java.io.IOException;
import java.util.*;
import java.io.*;
import java.util.regex.Pattern;
import java.util.regex.Matcher;

import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.util.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configured;


public class ExampleFileInputFormat extends Configured implements Tool {

     /*
      *  <generics>
      */
    public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {

        @Override
        protected boolean isSplitable(FileSystem fs, Path filename) {
            return false;
        }

        @Override
        public RecordReader<NullWritable, BytesWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
            return new WholeFileRecordReader((FileSplit) split, job);   
        }
    }

    public class WholeFileRecordReader implements RecordReader<NullWritable, BytesWritable> {

        private FileSplit fileSplit;
        private Configuration conf;
        private boolean processed = false;

        public WholeFileRecordReader(FileSplit fileSplit, Configuration conf) throws IOException {
            this.fileSplit = fileSplit;
            this.conf = conf;
        }

        @Override
        public NullWritable createKey() {
            return NullWritable.get();
        }

        @Override
        public BytesWritable createValue() {
            return new BytesWritable();
        }

        @Override
        public long getPos() throws IOException {
            return processed ? fileSplit.getLength() : 0;
        }

        @Override
        public float getProgress() throws IOException {
            return processed ? 1.0f : 0.0f;
        }

        @Override
        public boolean next(NullWritable key, BytesWritable value) throws IOException {
            if (!processed) {
                  byte[] contents = new byte[(int) fileSplit.getLength()];
                  Path file = fileSplit.getPath();
                  FileSystem fs = file.getFileSystem(conf);
                  FSDataInputStream in = null;
                  try {
                    in = fs.open(file);
                    IOUtils.readFully(in, contents, 0, contents.length);
                    value.set(contents, 0, contents.length);
                  } finally {
                    IOUtils.closeStream(in);
                  }
                  processed = true;
                  return true;
            }
            return false;
        }

        @Override
        public void close() throws IOException {
            // do nothing
        }
    }
      /* </generics> */


    /* 
     * <Task1>: 
     * */
    public static class ExampleMap extends MapReduceBase implements Mapper<NullWritable, BytesWritable, Text, IntWritable> {
     private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
      public void map(NullWritable key, BytesWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
          output.collect(new Text("test"), one);
        }
      }
    public static class ExampleReduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
          sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
      }
    }
    /* </Task1> */
    /*
     * <run>
     **/
    public int run(String[] args) throws Exception {

        if (args.length != 3) {
            printUsage();
            return 1;
        }

        String useCase = args[0];
        String inputPath = args[1];
        String outputPath = args[2];

        deleteOldOutput(outputPath);        


        JobConf conf = new JobConf(ExampleFileInputFormat.class);
        FileOutputFormat.setOutputPath(conf, new Path(outputPath));      
        FileInputFormat.setInputPaths(conf, new Path(inputPath));

        /* conf: Task1 */
        if (useCase.equals("cc_p")) {
            conf.setJobName("WordCount");
            /* Output: Key:Text -> Value:Integer */
            conf.setOutputKeyClass(Text.class);
            conf.setOutputValueClass(IntWritable.class);
            conf.setOutputFormat(TextOutputFormat.class);
            /* Input: Key.Text -> Value:Text */
            conf.setInputFormat(WholeFileInputFormat.class);
            conf.setMapperClass(ExampleMap.class);
            conf.setReducerClass(ExampleReduce.class);
        }
        /* default-option: Exit */
        else {
            printUsage();
            return 1;
        }

        JobClient.runJob(conf);
        return 0;
    }
    /* </run> */

    /*
     * <Main>
     */
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new ExampleFileInputFormat(), args);
        System.exit(res);
    }
    /* </Main> */

    /* 
    * <Helper>
    */
    private void printUsage() {
        System.out.println("usage: [usecase] [input-path] [output-path]");
        return;
    }

    private void deleteOldOutput(String outputPath) throws IOException {
        // Delete the output directory if it exists already
        Path outputDir = new Path(outputPath);
        FileSystem.get(getConf()).delete(outputDir, true);
    }
    /* </Helper->   */
}


Can anyone help me out?

Greetings from Germany, Alex

解决方案

You need to make the inner class static:

public static class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {

Otherwise Javac generats a constructor that expects a parent ExampleFileInputFormat class instance to be passed in.

这篇关于Hadoop:实现自定义FileInputFormat类时需要帮助的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆