组合器,也称为半减速器,是一个可选类,它通过接受Map类的输入,然后将输出键值对传递给Reducer类来操作./p>
组合器的主要功能是用相同的键来汇总地图输出记录.组合器的输出(键值集合)将通过网络发送到实际的Reducer任务作为输入.
组合器在Map类和Reduce类之间使用class来减少Map和Reduce之间的数据传输量.通常,map任务的输出很大,传输到reduce任务的数据很高.
以下MapReduce任务图显示了COMBINER PHASE.
这是一个简要介绍关于MapReduce Combiner如何工作和减去的总结;
组合器没有预定义的接口,它必须实现Reducer接口的减少()方法.
组合器对每个地图输出键进行操作.它必须具有与Reducer类相同的输出键值类型.
组合器可以从大型数据集生成摘要信息,因为它替换了原始Map输出.
虽然,Combiner是可选的,但它有助于将数据分成多个组以进行Reduce阶段,这使得处理更容易.
以下示例提供了有关组合器的理论概念.假设我们有一个名为 input.txt 的输入文本文件用于MapReduce.
What do you mean by Object What do you know about Java What is Java Virtual Machine How Java enabled High Performance
重要阶段下面讨论使用Combiner的MapReduce程序.
这是MapReduce的第一个阶段,其中记录阅读器从中读取每一行将文本文件作为文本输入,并将输出作为键值对.
输入 : 输入文件中的逐行文字.
输出 : 形成键值对.以下是预期键值对的集合.
<1, What do you mean by Object> <2, What do you know about Java> <3, What is Java Virtual Machine> <4, How Java enabled High Performance>
地图阶段从记录阅读器获取输入,处理它,并产生另一个输出一组键值对.
输入 : 以下键值对是从记录阅读器中获取的输入.
<1, What do you mean by Object> <2, What do you know about Java> <3, What is Java Virtual Machine> <4, How Java enabled High Performance>
Map阶段读取每个键值对,使用StringTokenizer将每个单词与值分开,将每个单词视为键,将该单词的计数视为值.以下代码片段显示了Mapper类和map函数.
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
输出 : 预期输出如下&
<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1> <What,1> <do,1> <you,1> <know,1> <about,1> <Java,1> <What,1> <is,1> <Java,1> <Virtual,1> <Machine,1> <How,1> <Java,1> <enabled,1> <High,1> <Performance,1>
组合器阶段从Map阶段获取每个键值对,处理它并生成输出为键值集合对.
输入 : 以下键值对是从Map阶段获取的输入.
<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1> <What,1> <do,1> <you,1> <know,1> <about,1> <Java,1> <What,1> <is,1> <Java,1> <Virtual,1> <Machine,1> <How,1> <Java,1> <enabled,1> <High,1> <Performance,1>
组合器阶段读取每个键值对,将常用词组合为键,将值组合为集合.通常,Combiner的代码和操作类似于Reducer的代码和操作.以下是Mapper,Combiner和Reducer类声明的代码片段.
job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class);
输出 : 预期输出如下<
<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1> <know,1> <about,1> <Java,1,1,1> <is,1> <Virtual,1> <Machine,1> <How,1> <enabled,1> <High,1> <Performance,1>
Reducer阶段从Combiner阶段获取每个键值集合对,处理它,以及将输出作为键值对传递.请注意,Combiner功能与Reducer相同.
输入 : 以下键值对是从组合器阶段获得的输入.
<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1> <know,1> <about,1> <Java,1,1,1> <is,1> <Virtual,1> <Machine,1> <How,1> <enabled,1> <High,1> <Performance,1>
Reducer阶段读取每个键值对.以下是Combiner的代码片段.
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
输出 : Reducer阶段的预期输出如下:<
<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1> <know,1> <about,1> <Java,3> <is,1> <Virtual,1> <Machine,1> <How,1> <enabled,1> <High,1> <Performance,1>
这是MapReduce的最后一个阶段,其中Record Writer从中写入每个键值对Reducer阶段并以文本形式发送输出.
输入 : Reducer阶段的每个键值对以及输出格式.
输出 : 它为您提供文本格式的键值对.以下是预期的输出.
What 3 do 2 you 2 mean 1 by 1 Object 1 know 1 about 1 Java 3 is 1 Virtual 1 Machine 1 How 1 enabled 1 High 1 Performance 1
以下代码块计算程序中的单词数.
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
将上述程序保存为 WordCount.java .下面给出了程序的编译和执行.
让我们假设我们在Hadoop用户的主目录中(对于例如,/home/hadoop).
按照下面给出的步骤编译并执行上述程序.
步骤1 : 使用以下命令创建一个目录来存储已编译的java类.
$ mkdir units
第2步 : 下载Hadoop-core-1.2.1.jar,用于编译和执行MapReduce程序.您可以从 mvnrepository.com 下载jar.
让我们假设下载的文件夹是/home/hadoop/.
第3步 : 使用以下命令编译 WordCount.java 程序并为程序创建一个jar.
$ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java $ jar -cvf units.jar -C units/.
第4步 : 使用以下命令在HDFS中创建输入目录.
$ HADOOP_HOME/bin/hadoop fs -mkdir input_dir
第5步 : 使用以下命令在HDFS的输入目录中复制名为 input.txt 的输入文件.
$ HADOOP_HOME/bin/hadoop fs -put/home/hadoop/input.txt input_dir
第6步 : 使用以下命令验证输入目录中的文件.
$ HADOOP_HOME/bin/hadoop fs -ls input_dir/
第7步 : 使用以下命令通过从输入目录获取输入文件来运行Word计数应用程序.
$ HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
等待文件执行一段时间.执行后,输出包含许多输入拆分,Map任务和Reducer任务.
步骤8 : 使用以下命令验证输出文件夹中的结果文件.
$ HADOOP_HOME/bin/hadoop fs -ls output_dir/
第9步 : 使用以下命令查看 Part-00000 文件中的输出.此文件由HDFS生成.
$ HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
以下是MapReduce程序生成的输出.
What 3 do 2 you 2 mean 1 by 1 Object 1 know 1 about 1 Java 3 is 1 Virtual 1 Machine 1 How 1 enabled 1 High 1 Performance 1