MapReduce - 组合器

组合器,也称为半减速器,是一个可选类,它通过接受Map类的输入,然后将输出键值对传递给Reducer类来操作./p>

组合器的主要功能是用相同的键来汇总地图输出记录.组合器的输出(键值集合)将通过网络发送到实际的Reducer任务作为输入.

组合器

组合器在Map类和Reduce类之间使用class来减少Map和Reduce之间的数据传输量.通常,map任务的输出很大,传输到reduce任务的数据很高.

以下MapReduce任务图显示了COMBINER PHASE.

Combiner

Combiner如何工作?

这是一个简要介绍关于MapReduce Combiner如何工作和减去的总结;

  • 组合器没有预定义的接口,它必须实现Reducer接口的减少()方法.

  • 组合器对每个地图输出键进行操作.它必须具有与Reducer类相同的输出键值类型.

  • 组合器可以从大型数据集生成摘要信息,因为它替换了原始Map输出.

虽然,Combiner是可选的,但它有助于将数据分成多个组以进行Reduce阶段,这使得处理更容易.

MapReduce组合器实现

以下示例提供了有关组合器的理论概念.假设我们有一个名为 input.txt 的输入文本文件用于MapReduce.

What do you mean by Object
What do you know about Java
What is Java Virtual Machine
How Java enabled High Performance

重要阶段下面讨论使用Combiner的MapReduce程序.

记录阅读器

这是MapReduce的第一个阶段,其中记录阅读器从中读取每一行将文本文件作为文本输入,并将输出作为键值对.

输入 : 输入文件中的逐行文字.

输出 : 形成键值对.以下是预期键值对的集合.

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

地图阶段

地图阶段从记录阅读器获取输入,处理它,并产生另一个输出一组键值对.

输入 : 以下键值对是从记录阅读器中获取的输入.

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

Map阶段读取每个键值对,使用StringTokenizer将每个单词与值分开,将每个单词视为键,将该单词的计数视为值.以下代码片段显示了Mapper类和map函数.

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
   private final static IntWritable one = new IntWritable(1);
   private Text word = new Text();
   
   public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
   {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) 
      {
         word.set(itr.nextToken());
         context.write(word, one);
      }
   }
}

输出 : 预期输出如下&

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

组合器阶段

组合器阶段从Map阶段获取每个键值对,处理它并生成输出为键值集合对.

输入 : 以下键值对是从Map阶段获取的输入.

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

组合器阶段读取每个键值对,将常用词组合为键,将值组合为集合.通常,Combiner的代码和操作类似于Reducer的代码和操作.以下是Mapper,Combiner和Reducer类声明的代码片段.

 job.setMapperClass(TokenizerMapper.class); 
 job.setCombinerClass(IntSumReducer.class); 
 job.setReducerClass(IntSumReducer.class);

输出 : 预期输出如下<

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Reducer阶段

Reducer阶段从Combiner阶段获取每个键值集合对,处理它,以及将输出作为键值对传递.请注意,Combiner功能与Reducer相同.

输入 : 以下键值对是从组合器阶段获得的输入.

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Reducer阶段读取每个键值对.以下是Combiner的代码片段.

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> 
{
   private IntWritable result = new IntWritable();
   
   public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException 
   {
      int sum = 0;
      for (IntWritable val : values) 
      {
         sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
   }
}

输出 :  Reducer阶段的预期输出如下:<

<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,3>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Record Writer

这是MapReduce的最后一个阶段,其中Record Writer从中写入每个键值对Reducer阶段并以文本形式发送输出.

输入 :  Reducer阶段的每个键值对以及输出格式.

输出 : 它为您提供文本格式的键值对.以下是预期的输出.

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

示例程序

以下代码块计算程序中的单词数.

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
   public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
   {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
      
      public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
      {
         StringTokenizer itr = new StringTokenizer(value.toString());
         while (itr.hasMoreTokens()) 
         {
            word.set(itr.nextToken());
            context.write(word, one);
         }
      }
   }
   
   public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> 
   {
      private IntWritable result = new IntWritable();
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
      {
         int sum = 0;
         for (IntWritable val : values) 
         {
            sum += val.get();
         }
         result.set(sum);
         context.write(key, result);
      }
   }
   
   public static void main(String[] args) throws Exception 
   {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "word count");
		
      job.setJarByClass(WordCount.class);
      job.setMapperClass(TokenizerMapper.class);
      job.setCombinerClass(IntSumReducer.class);
      job.setReducerClass(IntSumReducer.class);
		
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
		
      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
      System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}

将上述程序保存为 WordCount.java .下面给出了程序的编译和执行.

编译和执行

让我们假设我们在Hadoop用户的主目录中(对于例如,/home/hadoop).

按照下面给出的步骤编译并执行上述程序.

步骤1 : 使用以下命令创建一个目录来存储已编译的java类.

 
 $ mkdir units

第2步 : 下载Hadoop-core-1.2.1.jar,用于编译和执行MapReduce程序.您可以从 mvnrepository.com 下载jar.

让我们假设下载的文件夹是/home/hadoop/.

第3步 : 使用以下命令编译 WordCount.java 程序并为程序创建一个jar.

 
 $ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java 
 $ jar -cvf units.jar -C units/.

第4步 : 使用以下命令在HDFS中创建输入目录.

 
 $ HADOOP_HOME/bin/hadoop fs -mkdir input_dir

第5步 : 使用以下命令在HDFS的输入目录中复制名为 input.txt 的输入文件.

 
 $ HADOOP_HOME/bin/hadoop fs -put/home/hadoop/input.txt input_dir

第6步 : 使用以下命令验证输入目录中的文件.

 
 $ HADOOP_HOME/bin/hadoop fs -ls input_dir/

第7步 : 使用以下命令通过从输入目录获取输入文件来运行Word计数应用程序.

 
 $ HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

等待文件执行一段时间.执行后,输出包含许多输入拆分,Map任务和Reducer任务.

步骤8 : 使用以下命令验证输出文件夹中的结果文件.

 
 $ HADOOP_HOME/bin/hadoop fs -ls output_dir/

第9步 : 使用以下命令查看 Part-00000 文件中的输出.此文件由HDFS生成.

 
 $ HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下是MapReduce程序生成的输出.

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1