Hadoop setInputPathFilter错误 [英] Hadoop setInputPathFilter error

查看:165
本文介绍了Hadoop setInputPathFilter错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是Hadoop 0.20.2(无法更改),我想将过滤器添加到我的输入路径中。数据如下:

  / path1 / test_a1 
/ path1 / test_a2
/ path1 / train_a1
/ path1 / train_a2

我只想用列车处理所有文件

在它们中。



查看FileInputFormat类建议使用:

  FileInputFormat.setInputPathFilter(Job job,Class< ;? extends PathFilter> filter)

这是我的问题开始的地方,因为 PathFilter 是一个接口 - 当然,我可以扩展接口,但是我仍然没有实现。相反,我实现了接口:

  class TrainFilter实现PathFilter 
{
boolean accept(Path path )
{
return path.toString()。contains(train);


$ / code>

当我使用TrainFilter作为PathFilter代码编译时,当我运行它时,输入路径被搞乱了,我得到一个异常。没有设置过滤器,我的代码运行在/ path1下面的所有文件,但是,在设置过滤器时,它会抛出错误:

  InvalidInputException:输入路径不存在hdfs:// localhost:9000 / path1 

这里是如何在驱动程序代码中设置它:

  job.setMapperClass(.... class); 
job.setInputFormatClass(.... class);
job.setMapOutputKeyClass(... class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPathFilter(job,TrainFilter.class);
FileInputFormat.addInputPath(job,new Path(/ path1 /));
FileOutputFormat.setOutputPath(job,new Path(/ path2 /));
job.waitForCompletion(true);

任何有关我在这里做错的建议?

编辑:我发现了这个问题。 PathFilter的第一个调用始终是目录本身(/ path1),并且由于它不包含(train),所以目录本身是无效的,因此抛出异常。这给我带来了另一个问题:我如何测试一个任意路径是否是一个目录?据我所知,我需要对FileSystem的引用,它不是PathFilter的默认参数之一。 另外,您可以尝试遍历给定目录中的所有文件并检查文件名是否以列车。例如:

  Job job = new Job(conf,myJob); 
列表<路径> inputhPaths = new ArrayList< Path>();

String basePath =/ user / hadoop / path;
FileSystem fs = FileSystem.get(conf);
FileStatus [] listStatus = fs.globStatus(new Path(basePath +/ train *)); (FileStatus fstat:listStatus)
{
inputhPaths.add(fstat.getPath());
}

FileInputFormat.setInputPaths(job,
(Path [])inputhPaths.toArray(new Path [inputhPaths.size()]));


I am using Hadoop 0.20.2 (that cannot be changed) and I want to add a filter to my input path. The data looks as follows:

/path1/test_a1
/path1/test_a2
/path1/train_a1
/path1/train_a2

and I only want to process all files with train in them.

A look at the FileInputFormat class suggests to use:

 FileInputFormat.setInputPathFilter(Job job, Class<? extends PathFilter> filter)

and this is where my problem starts, since PathFilter is an interface - of course, I can extend the interface but then I still do not have an implementation. So instead, I implemented the interface:

class TrainFilter implements PathFilter
{
   boolean accept(Path path)
   {
      return path.toString().contains("train");
   }
}

When I use TrainFilter as PathFilter the code compiles, however when I run it, I get an exception as the input path is screwed up. Without setting the filter, my code runs through all files that are below /path1, however, when setting the filter, it throws the error:

InvalidInputException: Input path does not exist hdfs://localhost:9000/path1

Here is how I set it up in the driver code:

job.setMapperClass(....class);
job.setInputFormatClass(....class);
job.setMapOutputKeyClass(...class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPathFilter(job, TrainFilter.class);
FileInputFormat.addInputPath(job, new Path("/path1/"));
FileOutputFormat.setOutputPath(job, new Path("/path2/"));
job.waitForCompletion(true);

Any suggestions of what I am doing wrong here?

EDIT: I found the problem. The first call to the PathFilter is always the directory itself (/path1) and since it does not contain ("train"), the directory itself is invalid and thus the exception is thrown. Which brings me to another question: how can I test if an arbitrary path is a directory? For all I know, I need a reference to the FileSystem, which is not one of the default parameters of PathFilter.

解决方案

Alternatively, you may try to loop through all of the files in the given directory and check if the file names begin with train. E.g:

        Job job = new Job(conf, "myJob");
        List<Path> inputhPaths = new ArrayList<Path>();

        String basePath = "/user/hadoop/path";
        FileSystem fs = FileSystem.get(conf);
        FileStatus[] listStatus = fs.globStatus(new Path(basePath + "/train*"));
        for (FileStatus fstat : listStatus) {
            inputhPaths.add(fstat.getPath());
        }

        FileInputFormat.setInputPaths(job,
                (Path[]) inputhPaths.toArray(new Path[inputhPaths.size()]));

这篇关于Hadoop setInputPathFilter错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆