hadoop方法将输出发送到多个目录 [英] hadoop method to send output to multiple directories

查看:189
本文介绍了hadoop方法将输出发送到多个目录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的 MapReduce 作业按日期处理数据并将输出写入某个文件夹结构。目前的预期是产生以下结构:

  2013 
01
02
..

2012
01
02
..



$ p

在任何时候,我只能获得长达12个月的数据,所以我使用 MultipleOutputs code $>类创建12个输出,使用驱动程序中的以下函数:

  public void createOutputs(){
日历c = Calendar.getInstance();
字符串monthStr,pathStr;

//在过去12个月创建多个输出
// TODO为12个可配置
(int i = 0; i <12; ++ i){
//获取月份并添加1,因为月份为0基于索引
int month = c.get(Calendar.MONTH)+1;
//添加前导0
月份月=月份> 10? +月:0+月;
//以格式生成路径字符串2013/03 / etl
pathStr = c.get(Calendar.YEAR)++ monthStr +etl;
//添加命名输出
MultipleOutputs.addNamedOutput(config,pathStr);
//移至上个月
c.add(Calendar.MONTH,-1);


$ / code $ / pre

在reducer中,我添加了一个清理函数来移动生成的输出到适当的目录。

  protected void cleanup(Context context)throws IOException,InterruptedException {
//递归处理数据的自定义函数
moveFiles(FileSystem.get(new Configuration()),new Path(/ MyOutputPath));

$ / code>

问题:Reducer的清理函数在输出从_temporary目录到输出目录。由于这一点,上述函数在执行时没有看到任何输出,因为所有的数据仍然在_temporary目录中。



什么是最好的方式我实现所需的功能?
欣赏任何见解。



以下内容的思考:


  • 有没有办法使用自定义outputcommitter?

  • 链接另一份工作更好吗?还是对此过度矫枉过正?

  • 有没有更简单的替代方案,我只是不知道..



清理函数的文件结构示例日志:

  MyMapReduce:文件路径:hdfs:// localhost:8020 / dev / test 
MyMapReduce:filepath:hdfs:// localhost:8020 / dev / test / _logs
MyMapReduce:filepath:hdfs:// localhost:8020 / dev / test / _logs / history / job_201310301015_0224_1383763613843_371979_HtmlEtl
MyMapReduce:filepath:hdfs:// localhost:8020 / dev / test / _temporary
MyMapReduce:filepath:hdfs:// localhost:8020 / dev / test / _temporary /_attempt_201310301015_0224_r_000000_0
MyMapReduce:filepath:hdfs:// localhost:8020 / dev / test / _temporary / _attempt_201310301015_0224_r_000000_0 / 201307etl-r-00000
MyMapReduce:filepath:hdfs:// localhost:8020 / dev / test / _temporary / _attempt_201310301015_0224_r_000000_0 / part-r-00000


解决方案

您不应该需要第二份工作。我目前使用MultipleOutputs在我的一个程序中创建大量的输出目录。尽管有超过30个目录,但我只能使用几个MultipleOutputs对象。这是因为您可以在写入时设置输出目录,因此只有在需要时才能确定。如果你想以不同的格式输出(例如键值为Text.class,值为Text.class,键值为Text.class和Value:IntWritable.class),你只需要一个以上的namedOutput。



设置:

  MultipleOutputs.addNamedOutput(job,Output,TextOutputFormat。 class,Text.class,Text.class); 

减速器的设置:

  mout = new MultipleOutputs< Text,Text>(context); 

在缩减器中调用mout:

 字符串键; //设置为任何输出键将是
字符串值; //设置为任何输出值将是
String outputFileName; //设置为文件的绝对路径,这应该写入

mout.write(Output,new Text(key),new Text(value),outputFileName);

编码时可以有一段代码确定目录。例如,假设你想按月份和年份指定目录:

  int year; //从数据中提取年份
int month; //从数据中提取月份
String baseFileName; //父目录到此作业的所有输出
String outputFileName = baseFileName +/+ year +/+ month;

mout.write(Output,new Text(key),new Text(value),outputFileName);

希望这有助于您。



编辑:输出文件结构:

  Base 
2013
01
02
03
...
2012
01
...
...


My MapReduce job processes data by dates and needs to write output to a certain folder structure. Current expectation is to generate out put in following structure:

2013
    01
    02
    ..

2012
    01
    02
    ..

etc.

At any time, I get only upto 12 months of data, So, I am using MultipleOutputs class to create 12 outputs using the following function in the driver:

public void createOutputs(){
    Calendar c = Calendar.getInstance();
    String monthStr, pathStr;

    // Create multiple outputs for last 12 months
    // TODO make 12 configurable
    for(int i = 0; i < 12; ++i ){
        //Get month and add 1 as month is 0 based index
        int month = c.get(Calendar.MONTH)+1; 
        //Add leading 0
        monthStr = month > 10 ? "" + month : "0" + month ;  
        // Generate path string in the format 2013/03/etl
        pathStr = c.get(Calendar.YEAR) + "" + monthStr + "etl";
        // Add the named output
        MultipleOutputs.addNamedOutput(config, pathStr );  
        // Move to previous month
        c.add(Calendar.MONTH, -1); 
    }
}

In the reducer, I added a cleanup function to move the generated output to appropriate directories.

protected void cleanup(Context context) throws IOException, InterruptedException {
        // Custom function to recursively process data
        moveFiles (FileSystem.get(new Configuration()), new Path("/MyOutputPath"));
}

Problem: cleanup function of the reducer is getting executed before the output is moved from _temporary directory to the output directory. And due to this, the above function doesn't see any output at the time of execution since all the data is still in _temporary directory.

What is the best way for me to achieve the desired functionality? Appreciate any insights.

Thinking of the following:

  • Is there a way to use custom outputcommitter?
  • Is it better to chain another job or is it an overkill for this?
  • Is there a simpler alternative that I am just not aware of..

Here is the sample log of file structure from cleanup function:

MyMapReduce: filepath:hdfs://localhost:8020/dev/test
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs/history/job_201310301015_0224_1383763613843_371979_HtmlEtl
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/201307etl-r-00000
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/part-r-00000

解决方案

You should not need a second job. I am currently using MultipleOutputs to create a ton of output Directories in one of my programs. Despite there being upwards of 30 directories I am able to use only a couple of MultipleOutputs objects. This is because you can set output directory when you write, so it can be determined only when needed. You only actually need more than one namedOutput if you want to output in different formats (ex. one with key: Text.class, value: Text.class and one with key: Text.class and Value: IntWritable.class)

setup:

MultipleOutputs.addNamedOutput(job, "Output", TextOutputFormat.class, Text.class, Text.class);

setup of reducer:

mout = new MultipleOutputs<Text, Text>(context);

calling mout in reducer:

String key; //set to whatever output key will be
String value; //set to whatever output value will be
String outputFileName; //set to absolute path to file where this should write

mout.write("Output",new Text(key),new Text(value),outputFileName);

you can have a piece of code determine the directory while coding. For example say you want to specify directory by month and year:

int year;//extract year from data
int month;//extract month from data
String baseFileName; //parent directory to all outputs from this job
String outputFileName = baseFileName + "/" + year + "/" + month;

mout.write("Output",new Text(key),new Text(value),outputFileName);

Hope this helps.

EDIT: output file structure for above example:

Base
    2013
        01
        02
        03
        ...
    2012
        01
        ...
    ...

这篇关于hadoop方法将输出发送到多个目录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆