Hadoop多个输入 [英] Hadoop multiple inputs
问题描述
我正在使用hadoop map reduce,我想计算两个文件。我的第一个Map / Reduce迭代给了我一个带有双ID号码的文件,如下所示:
A 30
D 20
我的目标是使用该文件中的ID与另一个文件相关联,并有另一个输出与三人:ID,号码,姓名,如下所示:
A ABC 30
D EFGH 20
但我不确定使用Map Reduce是否是最好的方法。例如使用文件读取器读取第二个输入文件并通过ID获取名称会更好吗?或者我可以用Map Reduce做到吗?
如果是这样,我试图找出如何。我尝试了一个MultipleInput解决方案:
$ pre $ MultiplexInputs.addInputPath(job2,new Path(args [1] + - tmp),
TextInputFormat.class,FlightsByCarrierMapper2.class);
MultipleInputs.addInputPath(job2,new Path(inputplanes),
TextInputFormat.class,FlightsModeMapper.class);
但我想不出任何解决方案来组合这两者并获得我想要的输出。我现在的方式就是给我这个例子:
A ABC
A 30
B ABCD
C ABCDEF
D EFGH
D 20
在我的最后减少后,我得到这个:
N125DL 767-332
N125DL 7,
N126AT 737-76N
N126AT 19,
N126DL 767-332
N126DL 1,
N127DL 767-332
N127DL 7,
N128DL 767-332
N128DL 3
我想要:N127DL 7 767-332。而且,我不想要那些不合并的。
这是我的reduce类: b
$ b
public class FlightsByCarrierReducer2 extends Reducer {
String merge =;
protected void reduce(Text token,Iterable< Text> values,Context context)
throws IOException,InterruptedException {
int i = 0;
for(Text value:values)
{
if(i == 0){
merge = value.toString()+,;
}
else {
merge + = value.toString();
}
i ++;
}
context.write(token,new Text(merge));
$ b 更新:
http://stat-computing.org/dataexpo/2009/the-data.html 这就是我正在使用的示例。
我试着用:TailNum和Canceled,它是(1或0)获得对应于TailNum的模型名称。我的模型文件有一个TailNumb,模型和其他东西。我当前的输出是:
N193JB ERJ 190-100 IGW
N194DN 767-332
N19503 EMB-135ER
N19554 EMB-145LR
N195DN 767 -332
N195DN 2
第一个关键是第二个模型,模型下面的模型
我想要一个三重奏键,取消的模型数,因为我希望每个模型的取消数
解决方案您可以使用ID作为两个mapper的键来加入它们。
您可以编写您的地图任务,如下所示:
public void map(LongWritable k,Text value,Context context )throws IOException,InterruptedException
{
//获取行
//分割行以获取ID分隔符
// word1 = A
// word2 = 30
//同样为A ABC
// word1 = A
// word2 = ABC
context.write(word1,word2);
}
我认为您可以重复使用相同的Map任务。
然后编写一个常用的Reducer作业,Hadoop Framework在关键基础上对数据进行分组。
所以你将能够获得ID作为关键。
您可以缓存一个值然后concat。
String merge =;
public void reduce(Text key,Iterable< Text> values,Context context)
{
int i = 0;
for(Text value:values)
{
if(i == 0){
merge = value.toString()+,;
}
else {
merge + = value.toString();
}
i ++;
}
valEmit.set(merge);
context.write(key,valEmit);
}
最后,您可以编写您的Driver类
public int run(String [] args)throws Exception {
Configuration c = new Configuration();
String [] files = new GenericOptionsParser(c,args).getRemainingArgs();
路径p1 =新路径(文件[0]);
路径p2 =新路径(文件[1]);
路径p3 =新路径(文件[2]);
FileSystem fs = FileSystem.get(c);
if(fs.exists(p3)){
fs.delete(p3,true);
}
作业作业=新作业(c,多作业);
job.setJarByClass(MultipleFiles.class);
MultipleInputs.addInputPath(job,p1,TextInputFormat.class,MultipleMap1.class);
MultipleInputs.addInputPath(job,p2,TextInputFormat.class,MultipleMap2.class);
job.setReducerClass(MultipleReducer.class);
。
。
}
您可以找到示例这里
希望这会有所帮助。
更新
Input1
A 30
D 20
Input2
A ABC
D EFGH
输出
A ABC 30
D EFGH 20
Mapper.java
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
$ b $ **
*作者sreeveni
*
* /
公共类Mapper1扩展了Mapper< LongWritable,Text,Text,Text> {
Text keyEmit = new Text();
文本valEmit = new Text();
$ b $ public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException {
String line = value.toString();
字符串部分[] = line.split();
keyEmit.set(parts [0]);
valEmit.set(parts [1]);
context.write(keyEmit,valEmit);
}
}
Reducer.java
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/ **
* @author sreeveni
*
* /
公共类ReducerJoin扩展了Reducer<文本,文本,文本,文本> {
文本valEmit = new Text();
String merge =;
$ b $ public void reduce(Text key,Iterable< Text> values,Context context)
throws IOException,InterruptedException {
String character =;
String number =;
for(Text value:values){
//订单输出
String val = value.toString();
char myChar = val.charAt(0);
if(Character.isDigit(myChar)){
number = val;
} else {
character = val;
}
}
merge = character ++ number;
valEmit.set(merge);
context.write(key,valEmit);
}
}
strong>
import org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.conf.Configured;
导入org.apache.hadoop.fs.FileSystem;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
$ b $ **
* @author sreeveni
*
* /
公共类驱动程序扩展Configured implements工具{
public static void main (String [] args)抛出Exception {
// TODO自动生成的方法存根
//检查参数count
$ b $ if(args.length!= 3){
System.err
.println(Usage:< inputlocation>< inputlocation>< outputlocation>);
System.exit(0);
}
int res = ToolRunner.run(new Configuration(),new Driver(),args);
System.exit(res);
$ b @Override
public int run(String [] args)throws Exception {
// TODO自动生成的方法存根
String source1 = args [0];
String source2 = args [1];
String dest = args [2];
Configuration conf = new Configuration();
conf.set(mapred.textoutputformat.separator,); //将默认
//分隔符更改为用户
//输入分隔符
FileSystem fs = FileSystem.get(conf);
工作职位=新职位(conf,多个职位);
job.setJarByClass(Driver.class);
路径p1 =新路径(source1);
路径p2 =新路径(source2);
Path out = new Path(dest);
MultipleInputs.addInputPath(job,p1,TextInputFormat.class,
Mapper1.class);
MultipleInputs.addInputPath(job,p2,TextInputFormat.class,
Mapper1.class);
job.setReducerClass(ReducerJoin.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
$ b $ * b $ b *如果存在则删除
* /
if(fs.exists(out))
fs.delete(out,true);
TextOutputFormat.setOutputPath(job,out);
布尔成功= job.waitForCompletion(true);
返回成功? 0:1;
}
}
I am using hadoop map reduce and I want to compute two files. My first Map/Reduce iteration is giving me an a file with a pair ID number like this:
A 30
D 20
My goal is to use that ID from the file to associate with another file and have another output with a trio: ID, Number, Name, like this:
A ABC 30
D EFGH 20
But I am not sure whether using Map Reduce is the best way to do this. Would it be better for example to use a File Reader to Read the second input file and get the Name by ID? Or can I do it with Map Reduce?
If so, I'm trying to find out how. I tried a MultipleInput solution:
MultipleInputs.addInputPath(job2, new Path(args[1]+"-tmp"),
TextInputFormat.class, FlightsByCarrierMapper2.class);
MultipleInputs.addInputPath(job2, new Path("inputplanes"),
TextInputFormat.class, FlightsModeMapper.class);
But I can't think of any solution to combine the two and get the output I want. The way I have right now is just giving me the list like this example:
A ABC
A 30
B ABCD
C ABCDEF
D EFGH
D 20
After my Last Reduce I am getting this:
N125DL 767-332
N125DL 7 ,
N126AT 737-76N
N126AT 19 ,
N126DL 767-332
N126DL 1 ,
N127DL 767-332
N127DL 7 ,
N128DL 767-332
N128DL 3
I want this: N127DL 7 767-332. And also, I don't want the ones which do not combine.
And this is my reduce class:
public class FlightsByCarrierReducer2 extends Reducer {
String merge = "";
protected void reduce(Text token, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int i = 0;
for(Text value:values)
{
if(i == 0){
merge = value.toString()+",";
}
else{
merge += value.toString();
}
i++;
}
context.write(token, new Text(merge));
}
}
Update:
http://stat-computing.org/dataexpo/2009/the-data.html this is the example I'm using.
I'm trying with: TailNum and Cancelled which is (1 or 0) get the model name that corresponds to the TailNum. My file with model has a TailNumb, Model and other stuff. My current output is:
N193JB ERJ 190-100 IGW
N194DN 767-332
N19503 EMB-135ER
N19554 EMB-145LR
N195DN 767-332
N195DN 2
First comes the key, second the model, the keys that has flights cancelled, apperas below the model
And I would like a trio Key,Model Number of Cancelled, Because I want number of Cancellations per model
解决方案 You can join them using ID as key for both mapper.
You can write your map task as something like this
public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException
{
//Get the line
//split the line to get ID seperate
//word1 = A
//word2 = 30
//Likewise for A ABC
//word1 = A
//word2 = ABC
context.write(word1, word2);
}
I think you can resuse the same Map task.
And then write a commomn Reducer job where Hadoop Framework groups data on key basis.
So you will be able to get ID as key.
And You can cache one of the value and then concat.
String merge = "";
public void reduce(Text key, Iterable<Text> values, Context context)
{
int i =0;
for(Text value:values)
{
if(i == 0){
merge = value.toString()+",";
}
else{
merge += value.toString();
}
i++;
}
valEmit.set(merge);
context.write(key, valEmit);
}
Finally you can write your Driver class
public int run(String[] args) throws Exception {
Configuration c=new Configuration();
String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
Path p1=new Path(files[0]);
Path p2=new Path(files[1]);
Path p3=new Path(files[2]);
FileSystem fs = FileSystem.get(c);
if(fs.exists(p3)){
fs.delete(p3, true);
}
Job job = new Job(c,"Multiple Job");
job.setJarByClass(MultipleFiles.class);
MultipleInputs.addInputPath(job, p1, TextInputFormat.class, MultipleMap1.class);
MultipleInputs.addInputPath(job,p2, TextInputFormat.class, MultipleMap2.class);
job.setReducerClass(MultipleReducer.class);
.
.
}
You can find the example HERE
Hope this helps.
UPDATE
Input1
A 30
D 20
Input2
A ABC
D EFGH
Output
A ABC 30
D EFGH 20
Mapper.java
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* @author sreeveni
*
*/
public class Mapper1 extends Mapper<LongWritable, Text, Text, Text> {
Text keyEmit = new Text();
Text valEmit = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String parts[] = line.split(" ");
keyEmit.set(parts[0]);
valEmit.set(parts[1]);
context.write(keyEmit, valEmit);
}
}
Reducer.java
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* @author sreeveni
*
*/
public class ReducerJoin extends Reducer<Text, Text, Text, Text> {
Text valEmit = new Text();
String merge = "";
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String character = "";
String number = "";
for (Text value : values) {
// ordering output
String val = value.toString();
char myChar = val.charAt(0);
if (Character.isDigit(myChar)) {
number = val;
} else {
character = val;
}
}
merge = character + " " + number;
valEmit.set(merge);
context.write(key, valEmit);
}
}
Driver class
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author sreeveni
*
*/
public class Driver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
// checking the arguments count
if (args.length != 3) {
System.err
.println("Usage : <inputlocation> <inputlocation> <outputlocation> ");
System.exit(0);
}
int res = ToolRunner.run(new Configuration(), new Driver(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
String source1 = args[0];
String source2 = args[1];
String dest = args[2];
Configuration conf = new Configuration();
conf.set("mapred.textoutputformat.separator", " "); // changing default
// delimiter to user
// input delimiter
FileSystem fs = FileSystem.get(conf);
Job job = new Job(conf, "Multiple Jobs");
job.setJarByClass(Driver.class);
Path p1 = new Path(source1);
Path p2 = new Path(source2);
Path out = new Path(dest);
MultipleInputs.addInputPath(job, p1, TextInputFormat.class,
Mapper1.class);
MultipleInputs.addInputPath(job, p2, TextInputFormat.class,
Mapper1.class);
job.setReducerClass(ReducerJoin.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
/*
* delete if exist
*/
if (fs.exists(out))
fs.delete(out, true);
TextOutputFormat.setOutputPath(job, out);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
}
这篇关于Hadoop多个输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!