如何防止hadoop作业在损坏的输入文件上失败 [英] how to prevent hadoop job to fail on corrupted input file

查看:135
本文介绍了如何防止hadoop作业在损坏的输入文件上失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在许多输入文件上运行hadoop作业。
但如果其中一个文件被破坏,整个作业就失败了。



如何让作业忽略损坏的文件?
可能会为我写一些计数器/错误日志,但不会失败整个工作

解决方案

这取决于你的工作失败的地方 - 如果一条线损坏了,并且在你的地图方法的某个地方抛出了一个Exception,那么你应该能够用try / catch来包装你的map方法的主体,并且记录错误:

$ p $ protected void map(LongWritable key,Text value,Context context){
try {
/ /将值解析为长
int val = Integer.parseInt(value.toString());

//用key和val做一些事情..
} catch(NumberFormatException nfe){
//记录错误并继续
}
}

但是如果错误是由您的InputFormat的RecordReader引发的,那么您需要修改mappers run(..)方法 - 谁的默认实现如下:

  public void运行(Context context){
setup(context); $ context(),context.getCurrentValue(),context);
}
cleanup(context);





$ b

所以你可以修改这个以试图捕获 context.nextKeyValue()调用,但是您必须小心忽略读者抛出的任何错误 - 例如IOExeption可能不会因忽略错误而跳过如果你已经编写了自己的InputFormat / RecordReader,并且你有一个特定的异常表示记录失败,但是允许你跳过并继续解析,那么类似这样的东西可能会工作:

  public void run(Context context){
setup(context);
while(true){
尝试{
if(!context.nextKeyValue()){
break;
} else {
map(context.getCurrentKey(),context.getCurrentValue(),context);

} catch(SkippableRecordException sre){
// log error
}

}
cleanup(context);
}

但是为了重新使用 - 您的RecordReader必须能够在错误时恢复否则上面的代码可能会将您发送到无限循环。



对于您的特定情况 - 如果您只是想在第一次失败时忽略文件,则可以更新运行方法更简单:

  public void run(Context context){
setup(context);
尝试{
while(context.nextKeyValue()){
map(context.getCurrentKey(),context.getCurrentValue(),context);
}
cleanup(context);
} catch(Exception e){
// log error
}
}

一些最后的警告字:


  • 您需要确保它不是您的映射器代码导致异常被抛出,否则由于错误的原因,你将会忽略文件。
  • 非GZip压缩的GZip压缩文件实际上在记录读取器的初始化过程中会失败 - 所以上述内容不会捕获此类型或错误(您需要编写自己的记录阅读器实现)。对于在创建记录读取器期间引发的任何文件错误都是如此


I'm running hadoop job on many input files. But if one of the files is corrupted the whole job is fails.

How can I make the job to ignore the corrupted file? maybe write for me some counter/error log but not fail the whole job

解决方案

It depends on where your job is failing - if a line is corrupt, and somewhere in your map method an Exception is thrown then you should just be able to wrap the body of your map method with a try / catch and just log the error:

protected void map(LongWritable key, Text value, Context context) {
  try {
    // parse value to a long
    int val = Integer.parseInt(value.toString());

    // do something with key and val..
  } catch (NumberFormatException nfe) {
    // log error and continue
  }
}

But if the error is thrown by your InputFormat's RecordReader then you'll need to amend the mappers run(..) method - who's default implementation is as follows:

public void run(Context context) {
  setup(context);
  while (context.nextKeyValue()) {
    map(context.getCurrentKey(), context.getCurrentValue(), context);
  }
  cleanup(context);
}

So you could amend this to try and catch the exception on the context.nextKeyValue() call but you have to be careful on just ignoring any errors thrown by the reader - an IOExeption for example may not be 'skippable' by just ignoring the error.

If you have written your own InputFormat / RecordReader, and you have a specific exception which denotes record failure but will allow you to skip over and continue parsing, then something like this will probably work:

public void run(Context context) {
  setup(context);
  while (true) {
    try {
      if (!context.nextKeyValue()) { 
        break;
      } else {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } catch (SkippableRecordException sre) {
      // log error
    }

  }
  cleanup(context);
}

But just to re-itterate - your RecordReader must be able to recover on error otherwise the above code could send you into an infinite loop.

For your specific case - if you just want to ignore a file upon the first failure then you can update the run method to something much simpler:

public void run(Context context) {
  setup(context);
  try {
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
  } catch (Exception e) {
    // log error
  }
}

Some final words of warning:

  • You need to make sure that it isn't your mapper code which is causing the exception to be thrown, otherwise you'll be ignoring files for the wrong reason
  • GZip compressed files which are not GZip compressed will actually fail in the initialization of the record reader - so the above will not catch this type or error (you'll need to write your own record reader implementation). This is true for any file error that is thrown during record reader creation

这篇关于如何防止hadoop作业在损坏的输入文件上失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆