Hadoop多个输入 [英] Hadoop multiple inputs

查看:95
本文介绍了Hadoop多个输入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用hadoop map reduce,我想计算两个文件。我的第一个Map / Reduce迭代给了我一个带有双ID号码的文件,如下所示:

  A 30 
D 20

我的目标是使用该文件中的ID与另一个文件相关联,并有另一个输出与三人:ID,号码,姓名,如下所示:

  A ABC 30 
D EFGH 20

但我不确定使用Map Reduce是否是最好的方法。例如使用文件读取器读取第二个输入文件并通过ID获取名称会更好吗?或者我可以用Map Reduce做到吗?

如果是这样,我试图找出如何。我尝试了一个MultipleInput解决方案:

$ pre $ MultiplexInputs.addInputPath(job2,new Path(args [1] + - tmp),
TextInputFormat.class,FlightsByCarrierMapper2.class);
MultipleInputs.addInputPath(job2,new Path(inputplanes),
TextInputFormat.class,FlightsModeMapper.class);

但我想不出任何解决方案来组合这两者并获得我想要的输出。我现在的方式就是给我这个例子:

  A ABC 
A 30
B ABCD
C ABCDEF
D EFGH
D 20

在我的最后减少后,我得到这个:

  N125DL 767-332 
N125DL 7,
N126AT 737-76N
N126AT 19,
N126DL 767-332
N126DL 1,
N127DL 767-332
N127DL 7,
N128DL 767-332
N128DL 3

我想要:N127DL 7 767-332。而且,我不想要那些不合并的。



这是我的reduce类: b
$ b

public class FlightsByCarrierReducer2 extends Reducer {

  String merge =; 
protected void reduce(Text token,Iterable< Text> values,Context context)
throws IOException,InterruptedException {

int i = 0;
for(Text value:values)
{
if(i == 0){
merge = value.toString()+,;
}
else {
merge + = value.toString();
}
i ++;
}

context.write(token,new Text(merge));






$ b

更新:

http://stat-computing.org/dataexpo/2009/the-data.html 这就是我正在使用的示例。

我试着用:TailNum和Canceled,它是(1或0)获得对应于TailNum的模型名称。我的模型文件有一个TailNumb,模型和其他东西。我当前的输出是:

N193JB ERJ 190-100 IGW

N194DN 767-332



N19503 EMB-135ER

N19554 EMB-145LR

N195DN 767 -332



N195DN 2



第一个关键是第二个模型,模型下面的模型



我想要一个三重奏键,取消的模型数,因为我希望每个模型的取消数

解决方案

您可以使用ID作为两个mapper的键来加入它们。
您可以编写您的地图任务,如下所示:

  public void map(LongWritable k,Text value,Context context )throws IOException,InterruptedException 
{
//获取行
//分割行以获取ID分隔符
// word1 = A
// word2 = 30
//同样为A ABC
// word1 = A
// word2 = ABC
context.write(word1,word2);
}

我认为您可以重复使用相同的Map任务。
然后编写一个常用的Reducer作业,Hadoop Framework在关键基础上对数据进行分组。
所以你将能够获得ID作为关键。
您可以缓存一个值然后concat。

  String merge =; 
public void reduce(Text key,Iterable< Text> values,Context context)
{
int i = 0;
for(Text value:values)
{
if(i == 0){
merge = value.toString()+,;
}
else {
merge + = value.toString();
}
i ++;
}
valEmit.set(merge);
context.write(key,valEmit);
}

最后,您可以编写您的Driver类

  public int run(String [] args)throws Exception {
Configuration c = new Configuration();
String [] files = new GenericOptionsParser(c,args).getRemainingArgs();
路径p1 =新路径(文件[0]);
路径p2 =新路径(文件[1]);
路径p3 =新路径(文件[2]);
FileSystem fs = FileSystem.get(c);
if(fs.exists(p3)){
fs.delete(p3,true);
}
作业作业=新作业(c,多作业);
job.setJarByClass(MultipleFiles.class);
MultipleInputs.addInputPath(job,p1,TextInputFormat.class,MultipleMap1.class);
MultipleInputs.addInputPath(job,p2,TextInputFormat.class,MultipleMap2.class);
job.setReducerClass(MultipleReducer.class);


}

您可以找到示例这里



希望这会有所帮助。




更新

Input1

  A 30 
D 20



Input2

  A ABC 
D EFGH

输出

  A ABC 30 
D EFGH 20

Mapper.java

  import java.io.IOException; 

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
$ b $ **
*作者sreeveni
*
* /
公共类Mapper1扩展了Mapper< LongWritable,Text,Text,Text> {
Text keyEmit = new Text();
文本valEmit = new Text();
$ b $ public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException {
String line = value.toString();
字符串部分[] = line.split();
keyEmit.set(parts [0]);
valEmit.set(parts [1]);
context.write(keyEmit,valEmit);
}
}

Reducer.java

  import java.io.IOException; 

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/ **
* @author sreeveni
*
* /
公共类ReducerJoin扩展了Reducer<文本,文本,文本,文本> {

文本valEmit = new Text();
String merge =;
$ b $ public void reduce(Text key,Iterable< Text> values,Context context)
throws IOException,InterruptedException {
String character =;
String number =;
for(Text value:values){
//订单输出
String val = value.toString();
char myChar = val.charAt(0);

if(Character.isDigit(myChar)){
number = val;
} else {
character = val;
}
}
merge = character ++ number;
valEmit.set(merge);
context.write(key,valEmit);
}

}

strong>

  import org.apache.hadoop.conf.Configuration; 
导入org.apache.hadoop.conf.Configured;
导入org.apache.hadoop.fs.FileSystem;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
$ b $ **
* @author sreeveni
*
* /
公共类驱动程序扩展Configured implements工具{
public static void main (String [] args)抛出Exception {
// TODO自动生成的方法存根
//检查参数count
$ b $ if(args.length!= 3){
System.err
.println(Usage:< inputlocation>< inputlocation>< outputlocation>);
System.exit(0);
}
int res = ToolRunner.run(new Configuration(),new Driver(),args);
System.exit(res);


$ b @Override
public int run(String [] args)throws Exception {
// TODO自动生成的方法存根
String source1 = args [0];
String source2 = args [1];
String dest = args [2];
Configuration conf = new Configuration();
conf.set(mapred.textoutputformat.separator,); //将默认
//分隔符更改为用户
//输入分隔符
FileSystem fs = FileSystem.get(conf);
工作职位=新职位(conf,多个职位);

job.setJarByClass(Driver.class);
路径p1 =新路径(source1);
路径p2 =新路径(source2);
Path out = new Path(dest);
MultipleInputs.addInputPath(job,p1,TextInputFormat.class,
Mapper1.class);
MultipleInputs.addInputPath(job,p2,TextInputFormat.class,
Mapper1.class);
job.setReducerClass(ReducerJoin.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setOutputFormatClass(TextOutputFormat.class);
$ b $ * b $ b *如果存在则删除
* /
if(fs.exists(out))
fs.delete(out,true);

TextOutputFormat.setOutputPath(job,out);
布尔成功= job.waitForCompletion(true);

返回成功? 0:1;
}

}


I am using hadoop map reduce and I want to compute two files. My first Map/Reduce iteration is giving me an a file with a pair ID number like this:

A 30
D 20

My goal is to use that ID from the file to associate with another file and have another output with a trio: ID, Number, Name, like this:

A ABC 30
D EFGH 20

But I am not sure whether using Map Reduce is the best way to do this. Would it be better for example to use a File Reader to Read the second input file and get the Name by ID? Or can I do it with Map Reduce?

If so, I'm trying to find out how. I tried a MultipleInput solution:

MultipleInputs.addInputPath(job2, new Path(args[1]+"-tmp"),
    TextInputFormat.class, FlightsByCarrierMapper2.class);
MultipleInputs.addInputPath(job2, new Path("inputplanes"),
    TextInputFormat.class, FlightsModeMapper.class); 

But I can't think of any solution to combine the two and get the output I want. The way I have right now is just giving me the list like this example:

A ABC
A 30
B ABCD
C ABCDEF
D EFGH
D 20

After my Last Reduce I am getting this:

N125DL  767-332
N125DL  7   , 
N126AT  737-76N
N126AT  19  , 
N126DL  767-332
N126DL  1   , 
N127DL  767-332
N127DL  7   , 
N128DL  767-332
N128DL  3

I want this: N127DL 7 767-332. And also, I don't want the ones which do not combine.

And this is my reduce class:

public class FlightsByCarrierReducer2 extends Reducer {

String merge = "";
protected void reduce(Text token, Iterable<Text> values, Context context) 
                            throws IOException, InterruptedException {

    int i = 0;  
    for(Text value:values)
    {
        if(i == 0){
            merge = value.toString()+",";
        }
        else{
            merge += value.toString();
        }
        i++;
    }

        context.write(token, new Text(merge));

}

}

Update:

http://stat-computing.org/dataexpo/2009/the-data.html this is the example I'm using.

I'm trying with: TailNum and Cancelled which is (1 or 0) get the model name that corresponds to the TailNum. My file with model has a TailNumb, Model and other stuff. My current output is:

N193JB ERJ 190-100 IGW

N194DN 767-332

N19503 EMB-135ER

N19554 EMB-145LR

N195DN 767-332

N195DN 2

First comes the key, second the model, the keys that has flights cancelled, apperas below the model

And I would like a trio Key,Model Number of Cancelled, Because I want number of Cancellations per model

解决方案

You can join them using ID as key for both mapper. You can write your map task as something like this

public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException
{
    //Get the line
    //split the line to get ID seperate
    //word1 = A 
    //word2 = 30
                //Likewise for A ABC
                   //word1 = A 
                  //word2 = ABC
    context.write(word1, word2);
}

I think you can resuse the same Map task. And then write a commomn Reducer job where Hadoop Framework groups data on key basis. So you will be able to get ID as key. And You can cache one of the value and then concat.

String merge = "";
public void reduce(Text key, Iterable<Text> values, Context context)
{
    int i =0;
    for(Text value:values)
    {
        if(i == 0){
            merge = value.toString()+",";
        }
        else{
            merge += value.toString();
        }
        i++;
    }
    valEmit.set(merge);
    context.write(key, valEmit);
}

Finally you can write your Driver class

public int run(String[] args) throws Exception {
 Configuration c=new Configuration();
 String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
 Path p1=new Path(files[0]);
 Path p2=new Path(files[1]);
 Path p3=new Path(files[2]);
 FileSystem fs = FileSystem.get(c);
 if(fs.exists(p3)){
  fs.delete(p3, true);
  }
 Job job = new Job(c,"Multiple Job");
 job.setJarByClass(MultipleFiles.class);
 MultipleInputs.addInputPath(job, p1, TextInputFormat.class, MultipleMap1.class);
 MultipleInputs.addInputPath(job,p2, TextInputFormat.class, MultipleMap2.class);
 job.setReducerClass(MultipleReducer.class);
 .
 .
}

You can find the example HERE

Hope this helps.


UPDATE

Input1

A 30
D 20

Input2

A ABC
D EFGH

Output

A ABC 30
D EFGH 20

Mapper.java

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author sreeveni
 *
 */
public class Mapper1 extends Mapper<LongWritable, Text, Text, Text> {
    Text keyEmit = new Text();
    Text valEmit = new Text();

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String parts[] = line.split(" ");
        keyEmit.set(parts[0]);
        valEmit.set(parts[1]);
        context.write(keyEmit, valEmit);
    }
}

Reducer.java

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @author sreeveni
 *
 */
public class ReducerJoin extends Reducer<Text, Text, Text, Text> {

    Text valEmit = new Text();
    String merge = "";

    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        String character = "";
        String number = "";
        for (Text value : values) {
            // ordering output
            String val = value.toString();
            char myChar = val.charAt(0);

            if (Character.isDigit(myChar)) {
                number = val;
            } else {
                character = val;
            }
        }
        merge = character + " " + number;
        valEmit.set(merge);
        context.write(key, valEmit);
    }

}

Driver class

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author sreeveni
 *
 */
public class Driver extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        // checking the arguments count

        if (args.length != 3) {
            System.err
                    .println("Usage : <inputlocation>  <inputlocation>  <outputlocation> ");
            System.exit(0);
        }
        int res = ToolRunner.run(new Configuration(), new Driver(), args);
        System.exit(res);

    }

    @Override
    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        String source1 = args[0];
        String source2 = args[1];
        String dest = args[2];
        Configuration conf = new Configuration();
        conf.set("mapred.textoutputformat.separator", " "); // changing default
                                                            // delimiter to user
                                                            // input delimiter
        FileSystem fs = FileSystem.get(conf);
        Job job = new Job(conf, "Multiple Jobs");

        job.setJarByClass(Driver.class);
        Path p1 = new Path(source1);
        Path p2 = new Path(source2);
        Path out = new Path(dest);
        MultipleInputs.addInputPath(job, p1, TextInputFormat.class,
                Mapper1.class);
        MultipleInputs.addInputPath(job, p2, TextInputFormat.class,
                Mapper1.class);
        job.setReducerClass(ReducerJoin.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setOutputFormatClass(TextOutputFormat.class);

        /*
         * delete if exist
         */
        if (fs.exists(out))
            fs.delete(out, true);

        TextOutputFormat.setOutputPath(job, out);
        boolean success = job.waitForCompletion(true);

        return success ? 0 : 1;
    }

}

这篇关于Hadoop多个输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆