Hadoop ChainMapper,ChainReducer [英] Hadoop ChainMapper, ChainReducer

查看:131
本文介绍了Hadoop ChainMapper,ChainReducer的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对Hadoop比较陌生,并试图找出如何用ChainMapper,ChainReducer以编程方式链接作业(多个映射器,reducer)。我发现了一些部分示例,但不是一个完整的工作示例。



我当前的测试代码是

  public class ChainJobs extends Configured implements Tool {

public static class Map extends MapReduceBase implements Mapper< LongWritable,Text,Text,IntWritable> {

private static static IntWritable one = new IntWritable(1);
私人文字=新文字();
$ b $ public void map(LongWritable key,Text value,OutputCollector< Text,IntWritable> output,Reporter reporter)throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while(tokenizer.hasMoreTokens()){
word.set(tokenizer.nextToken());
output.collect(word,one);



$ b public static class Map2 extends MapReduceBase implements Mapper< Text,IntWritable,Text,IntWritable> {

private static static IntWritable one = new IntWritable(1);
私人文字=新文字();
$ b $ @Override
public void map(Text key,IntWritable value,OutputCollector< Text,IntWritable> output,Reporter reporter)throws IOException {
String line = value.toString() ;
StringTokenizer tokenizer = new StringTokenizer(line);
while(tokenizer.hasMoreTokens()){
word.set(tokenizer.nextToken()。concat(Justatest));
output.collect(word,one);



$ b public static class Reduce extends MapReduceBase implements Reducer< Text,IntWritable,Text,IntWritable> {

@Override
public void reduce(Text key,Iterator< IntWritable> values,OutputCollector< Text,IntWritable> output,Reporter reporter)throws IOException {
int sum = 0 ;
while(values.hasNext()){
sum + = values.next()。get();
}
output.collect(key,new IntWritable(sum));



@Override
public int run(String [] args){

Configuration conf = getConf();
JobConf job = new JobConf(conf);

job.setJobName(TestforChainJobs);
FileInputFormat.setInputPaths(job,new Path(args [0]));
FileOutputFormat.setOutputPath(job,new Path(args [1]));

JobConf map1Conf = new JobConf(false);
ChainMapper.addMapper(job,Map.class,LongWritable.class,Text.class,Text.class,IntWritable.class,true,map1Conf);

JobConf map2Conf = new JobConf(false);
ChainMapper.addMapper(job,Map2.class,Text.class,IntWritable.class,Text.class,IntWritable.class,true,map2Conf);

JobConf reduceConf = new JobConf(false);
ChainReducer.setReducer(job,Reduce.class,Text.class,IntWritable.class,Text.class,IntWritable.class,true,reduceConf);

JobClient.runJob(job);
返回0;




public static void main(String [] args)throws Exception {
int res = ToolRunner.run(新的配置(),新的ChainJobs(),参数);
System.exit(res);
}

但它失败,因为

  MapAttempt TASK_TYPE =MAPTASKID =task_201210162337_0009_m_000000TASK_ATTEMPT_ID =attempt_201210162337_0009_m_000000_0TASK_STATUS =FAILEDFINISH_TIME =1350397216365HOSTNAME =localhost\.localdomainERROR = java\.lang\.RuntimeException:在org\.apache\.hadoop\.util\.ReflectionUtils\.setJobConf(ReflectionUtils\.java:106)中配置对象
时出错
at org\.apache\.hadoop\.util\.ReflectionUtils\.setConf(ReflectionUtils\.java:72)
at org\.apache\.hadoop\ .util\.ReflectionUtils\.newInstance(ReflectionUtils\.java:130)
at org\.apache\.hadoop\.mapred\.MapTask\.runOldMapper(MapTask\。 java:389)
at org\.apache\.hadoop\.mapred\.MapTask\.run(MapTask\.java:327)
at org\.apache\\ \\.hadoop \\.mapred\.Child $ 4 \.run(Child \.java:268)
at java\.security\.AccessController\.doPrivileged(Native Method)
at javax\\ \\.security\.auth\.Subject\.doAs(Subject \.java:396)



任何提示或非常简单的工作示例都非常感谢。

解决方案


我已经编写了一个基于链式映射器的wordcount作业。代码已经写在新的API上,并且它的运行良好:)


  import java.io.IOException; 


导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.conf.Configured;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//不使用自定义格式实现CHAIN MAPREDUCE




// SPLIT MAPPER
class SplitMapper扩展了Mapper<对象,文本,文本,IntWritable>
{
private IntWritable dummyValue = new IntWritable(1);
//私人字符串内容;
private String tokens [];
@Override $ b $ public void map(Object key,Text value,Context context)throws IOException,InterruptedException {
tokens = value.toString()。split();
for(String x:tokens)
{
context.write(new Text(x),dummyValue);






$ b //大写映射
类UpperCaseMapper扩展了Mapper< Text ,IntWritable,文本,IntWritable>
{
@Override
public void map(Text key,IntWritable value,Context context)throws IOException,InterruptedException {
String val = key.toString()。toUpperCase();
Text newKey = new Text(val);
context.write(newKey,value);
}
}



// ChainMapReducer
类ChainMapReducer扩展了Reducer< Text,IntWritable,Text,IntWritable>
{
private int sum = 0;
@Override
public void reduce(Text key,Iterable< IntWritable> values,Context context)throws IOException,InterruptedException {
for(IntWritable value:values)
{
sum + = value.get();
}
context.write(key,new IntWritable(sum));
}
}
public class FirstClass extends Configured implements Tool {
static Configuration cf;
public int run(String args [])throws IOException,InterruptedException,ClassNotFoundException {
cf = new Configuration();

//绕过GenericOptionsParser部分并直接运行到作业声明部分
Job j = Job.getInstance(cf);

/ ************** CHAIN MAPPER AREA STARTS *********************** ********* /
配置splitMapConfig = new Configuration(false);
//下面我们在ChainMapper类下添加第一个映射器类
ChainMapper.addMapper(j,SplitMapper.class,Object.class,Text.class,Text.class,IntWritable.class,splitMapConfig);

//第二映射器的配置
配置upperCaseConfig = new Configuration(false);
//下面我们将第二个map映射器添加到链映射器类中
ChainMapper.addMapper(j,UpperCaseMapper.class,Text.class,IntWritable.class,Text.class,IntWritable .class,upperCaseConfig);
/ ************** CHAIN MAPPER AREA FINISHES *************************** ***** /

//现在继续正常交货
j.setJarByClass(FirstClass.class);
j.setCombinerClass(ChainMapReducer.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
Path p = new Path(args [1]);

//设置输入和输出URI
FileInputFormat.addInputPath(j,new Path(args [0]));
FileOutputFormat.setOutputPath(j,p);
p.getFileSystem(cf).delete(p,true);
返回j.waitForCompletion(true)?0:1;

public static void main(String args [])throws Exception {
int res = ToolRunner.run(cf,new FirstClass(),args);
System.exit(res);
}
}

输出的部分如下所示/>

  A 619 
按636
账户638
ACROSS? 655
地址657
之后674
集合687
AGO 704
全部721
755
更改768
金额785
AN 819
解剖820 $ b $和1198
焦虑1215
任意1232
APACHE 1300
附加1313
应用1330
应用程序。 1347
APPLICATIONS.ï1364
APPLIES 1381
ARCHITECTURE,1387
ARCHIVES 1388
ARE 1405
AS 1422
基于1439

您可能会看到一些特殊或不需要的字符,因为我没有使用任何清理来删除标点符号。我只专注于链式映射器的工作。
谢谢:)

I'm relatively new to Hadoop and trying to figure out how to programmatically chain jobs (multiple mappers, reducers) with ChainMapper, ChainReducer. I've found a few partial examples, but not a single complete and working one.

My current test code is

public class ChainJobs extends Configured implements Tool {

public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            output.collect(word, one);
        }
    }
}

public static class Map2 extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken().concat("Justatest"));
            output.collect(word, one);
        }
    }
}

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
    }
}

@Override
public int run(String[] args)  {

    Configuration conf = getConf();
    JobConf job = new JobConf(conf);

    job.setJobName("TestforChainJobs");
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    JobConf map1Conf = new JobConf(false);
    ChainMapper.addMapper(job, Map.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, map1Conf);

    JobConf map2Conf = new JobConf(false);
    ChainMapper.addMapper(job, Map2.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, map2Conf);

    JobConf reduceConf = new JobConf(false);
    ChainReducer.setReducer(job, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);

    JobClient.runJob(job);
    return 0;

     }

}

public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new ChainJobs(), args);
    System.exit(res);
}

But it fails with

MapAttempt TASK_TYPE="MAP" TASKID="task_201210162337_0009_m_000000" TASK_ATTEMPT_ID="attempt_201210162337_0009_m_000000_0" TASK_STATUS="FAILED" FINISH_TIME="1350397216365" HOSTNAME="localhost\.localdomain" ERROR="java\.lang\.RuntimeException: Error in configuring object
    at org\.apache\.hadoop\.util\.ReflectionUtils\.setJobConf(ReflectionUtils\.java:106)
    at org\.apache\.hadoop\.util\.ReflectionUtils\.setConf(ReflectionUtils\.java:72)
    at org\.apache\.hadoop\.util\.ReflectionUtils\.newInstance(ReflectionUtils\.java:130)
    at org\.apache\.hadoop\.mapred\.MapTask\.runOldMapper(MapTask\.java:389)
    at org\.apache\.hadoop\.mapred\.MapTask\.run(MapTask\.java:327)
    at org\.apache\.hadoop\.mapred\.Child$4\.run(Child\.java:268)
    at java\.security\.AccessController\.doPrivileged(Native Method)
    at javax\.security\.auth\.Subject\.doAs(Subject\.java:396)

Any hints or a very simple working example much appreciated.

解决方案


I have coded a wordcount job based on a chain mapper. The code has been written on new API and its working well :)

import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//implementing CHAIN MAPREDUCE without using custom format




//SPLIT MAPPER
class SplitMapper extends Mapper<Object,Text,Text,IntWritable>
{
    private IntWritable dummyValue=new IntWritable(1);
    //private String content;
    private String tokens[];
    @Override
    public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
        tokens=value.toString().split(" ");
        for(String x:tokens)
        {
        context.write(new Text(x), dummyValue);
        }
    }   
}




//UPPER CASE MAPPER
class UpperCaseMapper extends Mapper<Text,IntWritable,Text,IntWritable>
{
    @Override
    public void map(Text key,IntWritable value,Context context)throws IOException,InterruptedException{
        String val=key.toString().toUpperCase();
        Text newKey=new Text(val);
        context.write(newKey, value);
    }
}



//ChainMapReducer
class ChainMapReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
    private int sum=0;
    @Override
    public void reduce(Text key,Iterable<IntWritable>values,Context context)throws IOException,InterruptedException{
        for(IntWritable value:values)
        {
            sum+=value.get();
        }
        context.write(key, new IntWritable(sum));
    }
}
public class FirstClass extends Configured implements Tool{
    static Configuration cf;
    public int run (String args[])throws IOException,InterruptedException,ClassNotFoundException{
        cf=new Configuration();

        //bypassing the GenericOptionsParser part and directly running into job declaration part
        Job j=Job.getInstance(cf);

        /**************CHAIN MAPPER AREA STARTS********************************/
        Configuration splitMapConfig=new Configuration(false);
        //below we add the 1st mapper class under ChainMapper Class
        ChainMapper.addMapper(j, SplitMapper.class, Object.class, Text.class, Text.class, IntWritable.class, splitMapConfig);

        //configuration for second mapper
        Configuration upperCaseConfig=new Configuration(false);
        //below we add the 2nd mapper that is the lower case mapper to the Chain Mapper class
        ChainMapper.addMapper(j, UpperCaseMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, upperCaseConfig);
        /**************CHAIN MAPPER AREA FINISHES********************************/

        //now proceeding with the normal delivery
        j.setJarByClass(FirstClass.class);
        j.setCombinerClass(ChainMapReducer.class);
        j.setOutputKeyClass(Text.class);
        j.setOutputValueClass(IntWritable.class);
        Path p=new Path(args[1]);

        //set the input and output URI
        FileInputFormat.addInputPath(j, new Path(args[0]));
        FileOutputFormat.setOutputPath(j, p);
        p.getFileSystem(cf).delete(p, true);
        return j.waitForCompletion(true)?0:1;
    }
    public static void main(String args[])throws Exception{
        int res=ToolRunner.run(cf, new FirstClass(), args);
        System.exit(res);
    }
}

the part of the output has been shown below

A       619
ACCORDING       636
ACCOUNT 638
ACROSS? 655
ADDRESSES       657
AFTER   674
AGGREGATING,    687
AGO,    704
ALL     721
ALMOST  755
ALTERING        768
AMOUNT  785
AN      819
ANATOMY 820
AND     1198
ANXIETY 1215
ANY     1232
APACHE  1300
APPENDING       1313
APPLICATIONS    1330
APPLICATIONS.   1347
APPLICATIONS.�        1364
APPLIES 1381
ARCHITECTURE,   1387
ARCHIVES        1388
ARE     1405
AS      1422
BASED   1439

You might get to see some special or unwanted characters since I have not used any cleansing in order to remove the punctuation. I just have focussed on the working of a chain mapper. Thanks :)

这篇关于Hadoop ChainMapper,ChainReducer的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆