Hadoop:使用自定义InputFormat的NullPointerException [英] Hadoop: NullPointerException with Custom InputFormat
问题描述
我为Hadoop开发了一个自定义 InputFormat
(包括一个自定义的 InputSplit
和一个自定义的 RecordReader
),而且遇到罕见的 NullPointerException
。
这些类将被用于查询公开REST API以检索记录的第三方系统。因此,我从 DBInputFormat
,这是非HDFS InputFormat
。
我得到的错误如下:
错误:
中的java.lang.NullPointerException org.apache.hadoop.mapred.MapTask $ NewTrackingRecordReader.initialize(MapTask.java:524)
在org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:762)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
at org.apache .hadoop.mapred.YarnChild $ 2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java :396)
在org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.j ava:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
我查找了 MapTask
(2.1.0版本的Hadoop),我看到有问题的部分是初始化 RecordReader
:
472 NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
473 org.apache.hadoop.mapreduce.InputFormat< K,V> inputFormat,
474 TaskReporter记者,
475 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
476抛出InterruptedException,IOException {
...
491 this。 real = inputFormat.createRecordReader(split,taskContext);
...
494}
...
519 @Override
520 public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
521 org.apache.hadoop.mapreduce.TaskAttemptContext context
522)throws IOException,InterruptedException {
523 long bytesInPrev = getInputBytes(fsStats);
524 real.initialize(split,context);
525长bytesInCurr = getInputBytes(fsStats);
526 fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
527}
当然,我的代码的相关部分:
#MyInputFormat.java
public static void setEnvironmnet(作业作业,字符串主机,字符串端口,布尔型ssl,字符串APIKey ){
backend = new Backend(host,port,ssl,APIKey);
public static void addResId(Job job,String resId){
Configuration conf = job.getConfiguration();
字符串输入= conf.get(INPUT_RES_IDS,);
if(inputs.isEmpty()){
inputs + = restId;
} else {
inputs + =,+ resId;
}
conf.set(INPUT_RES_IDS,inputs);
}
@Override
public List< InputSplit> getSplits(JobContext工作){
//产生的分割容器
List< InputSplit> splits = new ArrayList< InputSplit>();
//获取作业配置
配置conf = job.getConfiguration();
//获取输入,即资源ID列表
String input = conf.get(INPUT_RES_IDS,);
String [] resIDs = StringUtils.split(input);
//遍历resIDs
for(String resID:resIDs){
splits.addAll(getSplitsResId(resID,job.getConfiguration()));
}
//返回分割
返回分割;
}
@Override
public RecordReader< LongWritable,Text> createRecordReader(InputSplit split,TaskAttemptContext context){
if(backend == null){
logger.info(无法创建MyRecordReader,似乎环境设置不正确);
返回null;
}
//创建一个记录读取器
返回新的MyRecordReader(后端,分割,上下文);
$ b $ MyRecordReader.java
$ b @Override
public void initialize(InputSplit split,TaskAttemptContext context)throws IOException,InterruptedException {
//获取开始,结束和当前位置
MyInputSplit inputSplit =(MyInputSplit)this.split;
start = inputSplit.getFirstRecordIndex();
end = start + inputSplit.getLength();
current = 0;
//向第三方系统查询相关资源,寻求分割开始
records = backend.getRecords(inputSplit.getResId(),start,end);
#MapReduceTest.java
public static void main(String [] args)throws Exception {
int res = ToolRunner.run(new配置(),新的MapReduceTest(),args);
System.exit(res);
}
@Override
public int run(String [] args)throws Exception {
Configuration conf = this.getConf();
Job job = Job.getInstance(conf,MapReduce test);
job.setJarByClass(MapReduceTest.class);
job.setMapperClass(MyMap.class);
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(MyInputFormat.class);
MyInputFormat.addInput(job,ca73a799-9c71-4618-806e-7bd0ca1911f4);
InputFormat.setEnvironmnet(job,my.host.com,443,true,my_api_key);
FileOutputFormat.setOutputPath(job,new Path(args [0]));
返回job.waitForCompletion(true)? 0:1;
}
任何有关错误的想法?
BTW,这是好的 InputSplit
initialize
方法中给出的构造函数?无论如何,我已经尝试了这两个选项,结果的错误是相同的:)
c $ c> real 在第524行是空的。
但不要听我说。请在那里放入 assert
或 system.out.println
,并检查 real
你自己。
NullPointerException
几乎总是意味着你点出了一些你并不期望的东西为空。有些图书馆和馆藏会以不能为空的方式向您发送。
错误:java .lang.NullPointerException在
org.apache.hadoop.mapred.MapTask $ NewTrackingRecordReader.initialize(MapTask.java:524)
对我来说,这读作:在 org.apache.hadoop.mapred
包中 MapTask
类有一个内部类 NewTrackingRecordReader
与一个初始化
方法,抛出一个 NullPointerException code>在第524行。
524 real.initialize(blah,blah)//实际上, dot
this.real
在线设置491.
491 this.real = inputFormat.createRecordReader(split,taskContext);
假设您没有忽略范围更广的 real
s这是掩盖 this.real
那么我们需要看看 inputFormat.createRecordReader(split,taskContext); $ c如果这可以返回
null
那么它可能是罪魁祸首。
原来它会返回 null
当后端
为空时。
@Override
public RecordReader< LongWritable,Text> createRecordReader(
InputSplit split,
TaskAttemptContext context){
if(backend == null){
logger.info(Unable to create MyRecordReader,+
看来环境没有正确设置);
返回null;
}
//创建一个记录读取器
返回新的MyRecordReader(后端,分割,上下文);
}
它看起来像 setEnvironmnet
应该设置后端
#MyInputFormat.java
public static void setEnvironmnet(
作业作业,
字符串主机,
字符串端口,
布尔型ssl,
字符串APIKey){
backend = new Backend(host,port,ssl,APIKey);
}
后端
必须是在 setEnvironment
之外的地方声明(或者你会得到一个编译器错误)。
如果 UPDATE: 正如您所指出的,自 I've developed a custom These classes are going to be used for querying a third-party system which exposes a REST API for records retrieving. Thus, I got inspiration in The error I get is the following: I've searched the code for Of course, the relevant parts of my code: Any ideas about what is wrong? BTW, which is the "good" The way I read your strack trace But don't take my word for it. Slip an To me this reads as: in the Assuming you haven't left out any more closely scoped Turns out it will return It looks like If UPDATE: As you've noted, since 这篇关于Hadoop:使用自定义InputFormat的NullPointerException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋! setEnvironmnet
在 createRecordReader
之前没有被调用,那么你应该期望得到正确的 NullPointerException
你。
setEnvironmnet()
是静态的后端
也必须是静态的。这意味着您必须确保其他实例没有将其设置为空。InputFormat
for Hadoop (including a custom InputSplit
and a custom RecordReader
) and I'm experiencing a rare NullPointerException
.DBInputFormat
, which is a non-HDFS InputFormat
as well.Error: java.lang.NullPointerException at
org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:524)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:762)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
MapTask
(2.1.0 version of Hadoop) and I've seen the problematic part is the initialization of the RecordReader
:472 NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
473 org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
474 TaskReporter reporter,
475 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
476 throws InterruptedException, IOException {
...
491 this.real = inputFormat.createRecordReader(split, taskContext);
...
494 }
...
519 @Override
520 public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
521 org.apache.hadoop.mapreduce.TaskAttemptContext context
522 ) throws IOException, InterruptedException {
523 long bytesInPrev = getInputBytes(fsStats);
524 real.initialize(split, context);
525 long bytesInCurr = getInputBytes(fsStats);
526 fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
527 }
# MyInputFormat.java
public static void setEnvironmnet(Job job, String host, String port, boolean ssl, String APIKey) {
backend = new Backend(host, port, ssl, APIKey);
}
public static void addResId(Job job, String resId) {
Configuration conf = job.getConfiguration();
String inputs = conf.get(INPUT_RES_IDS, "");
if (inputs.isEmpty()) {
inputs += restId;
} else {
inputs += "," + resId;
}
conf.set(INPUT_RES_IDS, inputs);
}
@Override
public List<InputSplit> getSplits(JobContext job) {
// resulting splits container
List<InputSplit> splits = new ArrayList<InputSplit>();
// get the Job configuration
Configuration conf = job.getConfiguration();
// get the inputs, i.e. the list of resource IDs
String input = conf.get(INPUT_RES_IDS, "");
String[] resIDs = StringUtils.split(input);
// iterate on the resIDs
for (String resID: resIDs) {
splits.addAll(getSplitsResId(resID, job.getConfiguration()));
}
// return the splits
return splits;
}
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
if (backend == null) {
logger.info("Unable to create a MyRecordReader, it seems the environment was not properly set");
return null;
}
// create a record reader
return new MyRecordReader(backend, split, context);
}
# MyRecordReader.java
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// get start, end and current positions
MyInputSplit inputSplit = (MyInputSplit) this.split;
start = inputSplit.getFirstRecordIndex();
end = start + inputSplit.getLength();
current = 0;
// query the third-party system for the related resource, seeking to the start of the split
records = backend.getRecords(inputSplit.getResId(), start, end);
}
# MapReduceTest.java
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MapReduceTest(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "MapReduce test");
job.setJarByClass(MapReduceTest.class);
job.setMapperClass(MyMap.class);
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(MyInputFormat.class);
MyInputFormat.addInput(job, "ca73a799-9c71-4618-806e-7bd0ca1911f4");
InputFormat.setEnvironmnet(job, "my.host.com", "443", true, "my_api_key");
FileOutputFormat.setOutputPath(job, new Path(args[0]));
return job.waitForCompletion(true) ? 0 : 1;
}
InputSplit
the RecordReader
must use, the one given to the constructor or the one given in the initialize
method? Anyway I've tried both options and the resulting error is the same :)real
is null on line 524.assert
or system.out.println
in there and check the value of real
yourself.NullPointerException
almost always means you dotted off something you didn't expect to be null. Some libraries and collections will throw it at you as their way of saying "this can't be null".Error: java.lang.NullPointerException at
org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:524)
org.apache.hadoop.mapred
package the MapTask
class has an inner class NewTrackingRecordReader
with an initialize
method that threw a NullPointerException
at line 524.524 real.initialize( blah, blah) // I actually stopped reading after the dot
this.real
was set on line 491. 491 this.real = inputFormat.createRecordReader(split, taskContext);
real
s that are masking the this.real
then we need to look at inputFormat.createRecordReader(split, taskContext);
If this can return null
then it might be the culprit.null
when backend
is null. @Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split,
TaskAttemptContext context) {
if (backend == null) {
logger.info("Unable to create a MyRecordReader, " +
"it seems the environment was not properly set");
return null;
}
// create a record reader
return new MyRecordReader(backend, split, context);
}
setEnvironmnet
is supposed to set backend
# MyInputFormat.java
public static void setEnvironmnet(
Job job,
String host,
String port,
boolean ssl,
String APIKey) {
backend = new Backend(host, port, ssl, APIKey);
}
backend
must be declared somewhere outside setEnvironment
(or you'd be getting a compiler error). backend
hasn't been set to something non-null upon construction and setEnvironmnet
was not called before createRecordReader
then you should expect to get exactly the NullPointerException
you got.setEnvironmnet()
is static backend
must be static as well. This means that you must be sure other instances aren't setting it to null.