Hadoop:使用自定义InputFormat的NullPointerException [英] Hadoop: NullPointerException with Custom InputFormat

查看:144
本文介绍了Hadoop:使用自定义InputFormat的NullPointerException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我为Hadoop开发了一个自定义 InputFormat (包括一个自定义的 InputSplit 和一个自定义的 RecordReader ),而且遇到罕见的 NullPointerException



这些类将被用于查询公开REST API以检索记录的第三方系统。因此,我从 DBInputFormat ,这是非HDFS InputFormat



我得到的错误如下:

 错误:
中的java.lang.NullPointerException org.apache.hadoop.mapred.MapTask $ NewTrackingRecordReader.initialize(MapTask.java:524)
在org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:762)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
at org.apache .hadoop.mapred.YarnChild $ 2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java :396)
在org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.j ava:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

我查找了 MapTask (2.1.0版本的Hadoop),我看到有问题的部分是初始化 RecordReader

  472 NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
473 org.apache.hadoop.mapreduce.InputFormat< K,V> inputFormat,
474 TaskReporter记者,
475 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
476抛出InterruptedException,IOException {
...
491 this。 real = inputFormat.createRecordReader(split,taskContext);
...
494}
...
519 @Override
520 public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
521 org.apache.hadoop.mapreduce.TaskAttemptContext context
522)throws IOException,InterruptedException {
523 long bytesInPrev = getInputBytes(fsStats);
524 real.initialize(split,context);
525长bytesInCurr = getInputBytes(fsStats);
526 fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
527}

当然,我的代码的相关部分:

 #MyInputFormat.java 

public static void setEnvironmnet(作业作业,字符串主机,字符串端口,布尔型ssl,字符串APIKey ){
backend = new Backend(host,port,ssl,APIKey);


public static void addResId(Job job,String resId){
Configuration conf = job.getConfiguration();
字符串输入= conf.get(INPUT_RES_IDS,);

if(inputs.isEmpty()){
inputs + = restId;
} else {
inputs + =,+ resId;
}

conf.set(INPUT_RES_IDS,inputs);
}

@Override
public List< InputSplit> getSplits(JobContext工作){
//产生的分割容器
List< InputSplit> splits = new ArrayList< InputSplit>();

//获取作业配置
配置conf = job.getConfiguration();

//获取输入,即资源ID列表
String input = conf.get(INPUT_RES_IDS,);
String [] resIDs = StringUtils.split(input);

//遍历resIDs
for(String resID:resIDs){
splits.addAll(getSplitsResId(resID,job.getConfiguration()));
}

//返回分割
返回分割;
}

@Override
public RecordReader< LongWritable,Text> createRecordReader(InputSplit split,TaskAttemptContext context){
if(backend == null){
logger.info(无法创建MyRecordReader,似乎环境设置不正确);
返回null;
}

//创建一个记录读取器
返回新的MyRecordReader(后端,分割,上下文);

$ b $ MyRecordReader.java
$ b @Override
public void initialize(InputSplit split,TaskAttemptContext context)throws IOException,InterruptedException {
//获取开始,结束和当前位置
MyInputSplit inputSplit =(MyInputSplit)this.split;
start = inputSplit.getFirstRecordIndex();
end = start + inputSplit.getLength();
current = 0;

//向第三方系统查询相关资源,寻求分割开始
records = backend.getRecords(inputSplit.getResId(),start,end);


#MapReduceTest.java

public static void main(String [] args)throws Exception {
int res = ToolRunner.run(new配置(),新的MapReduceTest(),args);
System.exit(res);
}

@Override
public int run(String [] args)throws Exception {
Configuration conf = this.getConf();
Job job = Job.getInstance(conf,MapReduce test);
job.setJarByClass(MapReduceTest.class);
job.setMapperClass(MyMap.class);
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(MyInputFormat.class);
MyInputFormat.addInput(job,ca73a799-9c71-4618-806e-7bd0ca1911f4);
InputFormat.setEnvironmnet(job,my.host.com,443,true,my_api_key);
FileOutputFormat.setOutputPath(job,new Path(args [0]));
返回job.waitForCompletion(true)? 0:1;
}

任何有关错误的想法?

BTW,这是好的 InputSplit RecordReader 必须使用,构造函数还是在 initialize 方法中给出的构造函数?无论如何,我已经尝试了这两个选项,结果的错误是相同的:)

解决方案

c $ c> real 在第524行是空的。



但不要听我说。请在那里放入 assert system.out.println ,并检查 real 你自己。



NullPointerException 几乎总是意味着你点出了一些你并不期望的东西为空。有些图书馆和馆藏会以不能为空的方式向您发送。

 错误:java .lang.NullPointerException在
org.apache.hadoop.mapred.MapTask $ NewTrackingRecordReader.initialize(MapTask.java:524)

对我来说,这读作:在 org.apache.hadoop.mapred 包中 MapTask 类有一个内部类 NewTrackingRecordReader 与一个初始化方法,抛出一个 NullPointerException code>在第524行。

  524 real.initialize(blah,blah)//实际上, dot 

this.real 在线设置491.

  491 this.real = inputFormat.createRecordReader(split,taskContext); 

假设您没有忽略范围更广的 real s这是掩盖 this.real 那么我们需要看看 inputFormat.createRecordReader(split,taskContext); null 那么它可能是罪魁祸首。



原来它会返回 null 后端为空时。

  @Override 
public RecordReader< LongWritable,Text> createRecordReader(
InputSplit split,
TaskAttemptContext context){

if(backend == null){
logger.info(Unable to create MyRecordReader,+
看来环境没有正确设置);
返回null;
}

//创建一个记录读取器
返回新的MyRecordReader(后端,分割,上下文);
}

它看起来像 setEnvironmnet 应该设置后端

 #MyInputFormat.java 

public static void setEnvironmnet(
作业作业,
字符串主机,
字符串端口,
布尔型ssl,
字符串APIKey){

backend = new Backend(host,port,ssl,APIKey);
}

后端必须是在 setEnvironment 之外的地方声明(或者你会得到一个编译器错误)。

如果 backend 在构造时没有被设置为非空,并且 setEnvironmnet createRecordReader 之前没有被调用,那么你应该期望得到正确的 NullPointerException 你。

UPDATE:

正如您所指出的,自 setEnvironmnet()是静态的后端也必须是静态的。这意味着您必须确保其他实例没有将其设置为空。


I've developed a custom InputFormat for Hadoop (including a custom InputSplit and a custom RecordReader) and I'm experiencing a rare NullPointerException.

These classes are going to be used for querying a third-party system which exposes a REST API for records retrieving. Thus, I got inspiration in DBInputFormat, which is a non-HDFS InputFormat as well.

The error I get is the following:

Error: java.lang.NullPointerException at
org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:524)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:762)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

I've searched the code for MapTask (2.1.0 version of Hadoop) and I've seen the problematic part is the initialization of the RecordReader:

472 NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
473       org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
474       TaskReporter reporter,
475       org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
476       throws InterruptedException, IOException {
...
491    this.real = inputFormat.createRecordReader(split, taskContext);
...
494 }
...
519 @Override
520 public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
521       org.apache.hadoop.mapreduce.TaskAttemptContext context
522       ) throws IOException, InterruptedException {
523    long bytesInPrev = getInputBytes(fsStats);
524    real.initialize(split, context);
525    long bytesInCurr = getInputBytes(fsStats);
526    fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
527 }

Of course, the relevant parts of my code:

# MyInputFormat.java

public static void setEnvironmnet(Job job, String host, String port, boolean ssl, String APIKey) {
    backend = new Backend(host, port, ssl, APIKey);
}

public static void addResId(Job job, String resId) {
    Configuration conf = job.getConfiguration();
    String inputs = conf.get(INPUT_RES_IDS, "");

    if (inputs.isEmpty()) {
        inputs += restId;
    } else {
        inputs += "," + resId;
    }

    conf.set(INPUT_RES_IDS, inputs);
}

@Override
public List<InputSplit> getSplits(JobContext job) {
    // resulting splits container
    List<InputSplit> splits = new ArrayList<InputSplit>();

    // get the Job configuration
    Configuration conf = job.getConfiguration();

    // get the inputs, i.e. the list of resource IDs
    String input = conf.get(INPUT_RES_IDS, "");
    String[] resIDs = StringUtils.split(input);

    // iterate on the resIDs
    for (String resID: resIDs) {
       splits.addAll(getSplitsResId(resID, job.getConfiguration()));
    }

    // return the splits
    return splits;
}

@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
    if (backend == null) {
        logger.info("Unable to create a MyRecordReader, it seems the environment was not properly set");
        return null;
    }

    // create a record reader
    return new MyRecordReader(backend, split, context);
}

# MyRecordReader.java

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    // get start, end and current positions
    MyInputSplit inputSplit = (MyInputSplit) this.split;
    start = inputSplit.getFirstRecordIndex();
    end = start + inputSplit.getLength();
    current = 0;

    // query the third-party system for the related resource, seeking to the start of the split
    records = backend.getRecords(inputSplit.getResId(), start, end);
}

# MapReduceTest.java

public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new MapReduceTest(), args);
    System.exit(res);
}

@Override
public int run(String[] args) throws Exception {
    Configuration conf = this.getConf();
    Job job = Job.getInstance(conf, "MapReduce test");
    job.setJarByClass(MapReduceTest.class);
    job.setMapperClass(MyMap.class);
    job.setCombinerClass(MyReducer.class);
    job.setReducerClass(MyReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(MyInputFormat.class);
    MyInputFormat.addInput(job, "ca73a799-9c71-4618-806e-7bd0ca1911f4");
    InputFormat.setEnvironmnet(job, "my.host.com", "443", true, "my_api_key");
    FileOutputFormat.setOutputPath(job, new Path(args[0]));
    return job.waitForCompletion(true) ? 0 : 1;
}

Any ideas about what is wrong?

BTW, which is the "good" InputSplit the RecordReader must use, the one given to the constructor or the one given in the initialize method? Anyway I've tried both options and the resulting error is the same :)

解决方案

The way I read your strack trace real is null on line 524.

But don't take my word for it. Slip an assert or system.out.println in there and check the value of real yourself.

NullPointerException almost always means you dotted off something you didn't expect to be null. Some libraries and collections will throw it at you as their way of saying "this can't be null".

Error: java.lang.NullPointerException at
org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:524)

To me this reads as: in the org.apache.hadoop.mapred package the MapTask class has an inner class NewTrackingRecordReader with an initialize method that threw a NullPointerException at line 524.

524 real.initialize( blah, blah) // I actually stopped reading after the dot

this.real was set on line 491.

491 this.real = inputFormat.createRecordReader(split, taskContext);

Assuming you haven't left out any more closely scoped reals that are masking the this.real then we need to look at inputFormat.createRecordReader(split, taskContext); If this can return null then it might be the culprit.

Turns out it will return null when backend is null.

@Override
public RecordReader<LongWritable, Text> createRecordReader(
    InputSplit split, 
    TaskAttemptContext context) {

    if (backend == null) {
        logger.info("Unable to create a MyRecordReader, " + 
                    "it seems the environment was not properly set");
        return null;
    }

    // create a record reader
    return new MyRecordReader(backend, split, context);
}

It looks like setEnvironmnet is supposed to set backend

# MyInputFormat.java

public static void setEnvironmnet(
    Job job, 
    String host, 
    String port, 
    boolean ssl, 
    String APIKey) {

    backend = new Backend(host, port, ssl, APIKey);
}

backend must be declared somewhere outside setEnvironment (or you'd be getting a compiler error).

If backend hasn't been set to something non-null upon construction and setEnvironmnet was not called before createRecordReader then you should expect to get exactly the NullPointerException you got.

UPDATE:

As you've noted, since setEnvironmnet() is static backend must be static as well. This means that you must be sure other instances aren't setting it to null.

这篇关于Hadoop:使用自定义InputFormat的NullPointerException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆