MapReduce是一个框架,用于编写应用程序,以可靠的方式处理大型商用硬件集群上的大量数据.本章将指导您使用Java在Hadoop框架中运行MapReduce.
通常MapReduce范例基于发送map-reduce程序实际数据所在的计算机.
在MapReduce作业期间,Hadoop将Map和Reduce任务发送到相应的服务器中.群集.
该框架管理数据传递的所有细节,如发布任务,验证任务完成以及在节点之间复制群集周围的数据.
大多数计算都发生在节点上,本地磁盘上的数据可以减少网络流量.
完成给定任务后,群集会收集并减少数据以形成适当的结果,并将其发送回Hadoop服务器.
MapReduce框架在键值对上运行,也就是说,框架将作业的输入视为一组键值对并生成一组键值对作为作业的输出,可以想象为不同类型.
键和值类必须由框架序列化,因此需要实现Writable接口.此外,关键类必须实现WritableComparable接口以便于按框架进行排序.
MapReduce作业的输入和输出格式都是键值对 :
(输入)< k1,v1> - >地图 - > < k2,v2> - >减少 - > < k3,v3> (输出).
输入 | 输出 | |
---|---|---|
Map | < ; k1,v1> | list(< k2,v2>) |
Reduce | < k2,list(v2)> | 列表(< k3,v3>) |
下表显示了有关组织电力消耗的数据.该表包括每月电力消耗和连续五年的年平均值.
Jan | 2月 | 3月 | 4月 | May | Jun | 7月 | 8月 | 9月 | 10月 | 11月 | 12月 | Avg | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
我们需要编写应用程序来处理给定表中的输入数据,以查找最大使用年份,即最小年份用法,等等.对于具有有限数量记录的程序员来说,这个任务很容易,因为他们只需编写逻辑来生成所需的输出,并将数据传递给书面应用程序.
现在让我们提高输入数据的比例.假设我们必须分析特定州所有大型工业的电力消耗.当我们编写处理此类批量数据的应用程序时,
他们将花费大量时间执行.
当我们将数据从源移动到网络服务器时,网络流量会很大.
要解决这些问题,我们有MapReduce框架.
以上数据保存为样本.txt 并作为输入提供.输入文件如下所示.
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
示例数据的以下程序使用MapReduce框架.
package hadoop; import java.util.*; import java.io.IOException; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class ProcessUnits { //Mapper class public static class E_EMapper extends MapReduceBase implements Mapper<LongWritable, /*Input key Type */ Text, /*Input value Type*/ Text, /*Output key Type*/ IntWritable> /*Output value Type*/ { //Map function public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String lasttoken = null; StringTokenizer s = new StringTokenizer(line,"\t"); String year = s.nextToken(); while(s.hasMoreTokens()){ lasttoken=s.nextToken(); } int avgprice = Integer.parseInt(lasttoken); output.collect(new Text(year), new IntWritable(avgprice)); } } //Reducer class public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > { //Reduce function public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException { int maxavg=30; int val=Integer.MIN_VALUE; while (values.hasNext()) { if((val=values.next().get())>maxavg) { output.collect(key, new IntWritable(val)); } } } } //Main function public static void main(String args[])throws Exception { JobConf conf = new JobConf(Eleunits.class); conf.setJobName("max_eletricityunits"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(E_EMapper.class); conf.setCombinerClass(E_EReduce.class); conf.setReducerClass(E_EReduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
将上述程序保存到 ProcessUnits.java 中.下面给出了程序的编译和执行.
让我们假设我们在Hadoop的主目录中user(例如/home/hadoop).
按照下面给出的步骤编译并执行上述程序.
步骤1 : 使用以下命令创建一个目录来存储已编译的java类.
$ mkdir units
第2步 : 下载Hadoop-core-1.2.1.jar,用于编译和执行MapReduce程序.从 mvnrepository.com 下载jar.我们假设下载文件夹是/home/hadoop/.
第3步 : 以下命令用于编译 ProcessUnits.java 程序并为程序创建jar.
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java $ jar -cvf units.jar -C units/.
第4步 : 以下命令用于在HDFS中创建输入目录.
$ HADOOP_HOME/bin/hadoop fs -mkdir input_dir
第5步 : 以下命令用于复制HDFS输入目录中名为 sample.txt 的输入文件.
$ HADOOP_HOME/bin/hadoop fs -put/home/hadoop/sample.txt input_dir
第6步 : 以下命令用于验证输入目录中的文件
$ HADOOP_HOME/bin/hadoop fs -ls input_dir/
第7步 : 以下命令用于通过从输入目录获取输入文件来运行Eleunit_max应用程序.
$ HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
等待文件执行一段时间.执行后,输出包含许多输入拆分,Map任务,Reducer任务等.
INFO mapreduce.Job: Job job_1414748220717_0002 completed successfully 14/10/31 06:02:52 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=61 FILE: Number of bytes written=279400 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=546 HDFS: Number of bytes written=40 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=146137 Total time spent by all reduces in occupied slots (ms)=441 Total time spent by all map tasks (ms)=14613 Total time spent by all reduce tasks (ms)=44120 Total vcore-seconds taken by all map tasks=146137 Total vcore-seconds taken by all reduce tasks=44120 Total megabyte-seconds taken by all map tasks=149644288 Total megabyte-seconds taken by all reduce tasks=45178880 Map-Reduce Framework Map input records=5 Map output records=5 Map output bytes=45 Map output materialized bytes=67 Input split bytes=208 Combine input records=5 Combine output records=5 Reduce input groups=5 Reduce shuffle bytes=6 Reduce input records=5 Reduce output records=5 Spilled Records=10 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=948 CPU time spent (ms)=5160 Physical memory (bytes) snapshot=47749120 Virtual memory (bytes) snapshot=2899349504 Total committed heap usage (bytes)=277684224 File Output Format Counters Bytes Written=40
第8步 : 以下命令用于验证输出文件夹中的结果文件.
$ HADOOP_HOME/bin/hadoop fs -ls output_dir/
第9步 : 以下命令用于查看 Part-00000 文件中的输出.此文件由HDFS生成.
$ HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
以下是MapReduce程序生成的输出 :
1981 | 34 |
1984 | 40 |
1985 | 45 |
第10步 : 以下命令用于将输出文件夹从HDFS复制到本地文件系统.
$ HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir/home/hadoop