为简单的hadoop mapreduce作业运行两个mapper和两个reducer [英] Running two mapper and two reducer for simple hadoop mapreduce jobs

查看:82
本文介绍了为简单的hadoop mapreduce作业运行两个mapper和两个reducer的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我只是想更好地理解使用多个映射器和reducer。我想用一个简单的hadoop mapreduce Word计算job.I想为这个wordcount job运行两个映射器和两个reducer。我需要在配置文件上手动配置,或者仅仅对WordCount.java文件进行更改就足够了。



我在单个节点上运行此作业。而且我正在执行这项工作:
$ b


$ hadoop jar job.jar输入输出




我已经开始了

  $ hadoop namenode -format 
$ hadoop namenode

$ hadoop datanode

sbin $ ./yarn-daemon .sh start resourcemanager
sbin $ ./yarn-daemon.sh start resourcemanager

lockquote

我正在运行hadoop-2.0.0 -cdh4.0.0


我的WordCount.java文件是

  package org.apache。 hadoop.examples; 

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

导入org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.rg.apache.hadoop.fs.Path;
导入oapache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
private static final Log LOG = LogFactory.getLog(WordCount.class);

public static class TokenizerMapper
extends Mapper< Object,Text,Text,IntWritable> {

private final IntWritable one = new IntWritable(1);
私人文字=新文字();

public void map(Object key,Text value,Context context
)throws IOException,InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word,one);




public static class IntSumReducer
extends Reducer< Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
$ b public void reduce(Text key,Iterable< IntWritable> values,
Context context
)throws IOException,InterruptedException {
int sum = 0;
// printKeyAndValues(key,values); (IntWritable val:values){
sum + = val.get();


LOG.info(val =+ val.get());
}
LOG.info(sum =+ sum +key =+ key);
result.set(sum);
context.write(key,result);
//System.err.println(String.format(\"[reduce] word:(%s),count:(%d),key,result.get()));
}


//一个打印调试输出的小方法
private void printKeyAndValues(Text key,Iterable< IntWritable> values)
{
StringBuilder sb = new StringBuilder(); (IntWritable val:values)

{
sb.append(val.get()+,);
}
System.err.println(String.format([reduce] key:(%s),value:(%s),key,sb.toString()));



public static void main(String [] args)throws Exception {
Configuration conf = new Configuration();
String [] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length!= 2){
System.err.println(Usage:wordcount< in>< out>);
System.exit(2);
}
工作工作=新工作(conf,字数);
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path(otherArgs [0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs [1]));

System.exit(job.waitForCompletion(true)?0:1);






$ b $ p你们中的任何一个人现在可以帮我运行两个映射器和这个字计数作业的减速器?

解决方案

Gladnick:如果您打算使用默认的 TextInputFormat ,那么在输入文件的数量上会有至少许多映射器(或者更多,取决于文件大小)。所以只需将2个文件放入您的输入目录,以便您可以运行2个映射器。 (为此解决方案提供建议,因为您计划将其作为测试用例运行)。



既然您已经要求提供2个reducer,您只需在作业提交工作的主要部分 job.setNumReduceTasks(2)

之后,准备一个应用程序jar并在 hadoop伪集群中运行。



如果您需要指定哪个单词指向哪个reducer,您可以在Partitioner类中指定。

  Configuration configuration = new Configuration(); 
//创建一个配置对象,用于访问各种
//配置参数
Job job = new Job(配置,Wordcount-Vowels& Consonants);
//创建作业对象并将作业名称设置为Wordcount-Vowels&
//辅音
job.setJarByClass(WordCount.class);
//设置主类
job.setNumReduceTasks(2);
//设置所需减少任务的数量
job.setMapperClass(WordCountMapper.class);
//设置作业的地图类
job.setCombinerClass(WordCountCombiner.class);
//设置作业的组合器类
job.setPartitionerClass(VowelConsonantPartitioner.class);
//设置作业的分区类class $ b $ job.setReducerClass(WordCountReducer.class);
//设置作业的reduce类
job.setOutputKeyClass(Text.class);
//设置作业期望键(单词)的输出类型,Text
//类似于String
job.setOutputValueClass(IntWritable.class);
//设置作业期望值的输出类型(计数),
// IntWritable类似于int
FileInputFormat.addInputPath(job,new Path(args [0])) ;
//设置获取输入文件的输入目录
FileOutputFormat.setOutputPath(job,new Path(args [1]));

这应该是您主程序的结构。如有需要,您可以包含组合器和分区器。

I just wanted to have a better understanding on using multiple mappers and reducers.I want to try this out using a simple hadoop mapreduce Word count job.I want to run two mapper and two reducer for this wordcount job.Is there that I need to configure manually on the configuration files or is it just enough to just make changes on the WordCount.java file.

I'm running this job on a Single node.And I'm running this job as

$ hadoop jar job.jar input output

And i've started

$ hadoop namenode -format
$ hadoop namenode

$ hadoop datanode

sbin$ ./yarn-daemon.sh start resourcemanager sbin$ ./yarn-daemon.sh start resourcemanager

I'm running hadoop-2.0.0-cdh4.0.0

And my WordCount.java file is

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.rg.apache.hadoop.fs.Path;
import oapache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
private static final Log LOG = LogFactory.getLog(WordCount.class);

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      //printKeyAndValues(key, values);

      for (IntWritable val : values) {
        sum += val.get();
      LOG.info("val = " + val.get());
      }
      LOG.info("sum = " + sum + " key = " + key);
      result.set(sum);
      context.write(key, result);
      //System.err.println(String.format("[reduce] word: (%s), count: (%d)", key, result.get()));
    }


  // a little method to print debug output
    private void printKeyAndValues(Text key, Iterable<IntWritable> values)
    {
      StringBuilder sb = new StringBuilder();
      for (IntWritable val : values)
      {
        sb.append(val.get() + ", ");
      }
      System.err.println(String.format("[reduce] key: (%s), value: (%s)", key, sb.toString()));
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Could anyone of you help me now to run two mapper and the reducers for this Word count job?

解决方案

Gladnick: In case you are planning to use the default TextInputFormat, there would be atleast as many mappers at the number of input files (or more depending on the file size). So just put 2 files into your input directories so that you can get 2 mappers running. (Advising this solution, because you plan to run this as a test case).

Now that you have asked for 2 reducers, all you need to do is job.setNumReduceTasks(2) in your main befor submiting the job.

After that just prepare a jar of your application and run that in hadoop pseudo cluster.

In case you need to specify which word to go to which reducer, you can specify that in the Partitioner class.

            Configuration configuration = new Configuration();
        // create a configuration object that provides access to various
        // configuration parameters
        Job job = new Job(configuration, "Wordcount-Vowels & Consonants");
        // create the job object and set job name as Wordcount-Vowels &
        // Consonants
        job.setJarByClass(WordCount.class);
        // set the main class
        job.setNumReduceTasks(2);
        // set the number of reduce tasks required
        job.setMapperClass(WordCountMapper.class);
        // set the map class for the job
        job.setCombinerClass(WordCountCombiner.class);
        // set the combiner class for the job
        job.setPartitionerClass(VowelConsonantPartitioner.class);
        // set the partitioner class for the job
        job.setReducerClass(WordCountReducer.class);
        // set the reduce class for the job
        job.setOutputKeyClass(Text.class);
        // set the output type of key (the word) expected from the job, Text
        // analogous to String
        job.setOutputValueClass(IntWritable.class);
        // set the output type of value (the count) expected from the job,
        // IntWritable analogous to int
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // set the input directory for fetching the input files
        FileOutputFormat.setOutputPath(job, new Path(args[1])); 

This should be the structure of your main program. You may include the combiner and the partitioner in case needed.

这篇关于为简单的hadoop mapreduce作业运行两个mapper和两个reducer的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆