MapReduce - Hadoop实现

MapReduce是一个框架,用于编写应用程序,以可靠的方式处理大型商用硬件集群上的大量数据.本章将指导您使用Java在Hadoop框架中运行MapReduce.

MapReduce算法

通常MapReduce范例基于发送map-reduce程序实际数据所在的计算机.

  • 在MapReduce作业期间,Hadoop将Map和Reduce任务发送到相应的服务器中.群集.

  • 该框架管理数据传递的所有细节,如发布任务,验证任务完成以及在节点之间复制群集周围的数据.

  • 大多数计算都发生在节点上,本地磁盘上的数据可以减少网络流量.

  • 完成给定任务后,群集会收集并减少数据以形成适当的结果,并将其发送回Hadoop服务器.

MapReduce算法

输入和输出(Java透视)

MapReduce框架在键值对上运行,也就是说,框架将作业的输入视为一组键值对并生成一组键值对作为作业的输出,可以想象为不同类型.

键和值类必须由框架序列化,因此需要实现Writable接口.此外,关键类必须实现WritableComparable接口以便于按框架进行排序.

MapReduce作业的输入和输出格式都是键值对 :

(输入)< k1,v1> - &GT;地图 - > < k2,v2>  - >减少 - > < k3,v3> (输出).


输入输出
Map&lt ; k1,v1>list(< k2,v2>)
Reduce< k2,list(v2)>列表(< k3,v3>)

MapReduce实施

下表显示了有关组织电力消耗的数据.该表包括每月电力消耗和连续五年的年平均值.


Jan2月3月4月MayJun7月8月9月10月11月12月Avg
19792323243242526262626252625
198026272828283031313130303029
198131323232333435363634343434
198439383939394142434039383840
198538393939394141410040393945

我们需要编写应用程序来处理给定表中的输入数据,以查找最大使用年份,即最小年份用法,等等.对于具有有限数量记录的程序员来说,这个任务很容易,因为他们只需编写逻辑来生成所需的输出,并将数据传递给书面应用程序.

现在让我们提高输入数据的比例.假设我们必须分析特定州所有大型工业的电力消耗.当我们编写处理此类批量数据的应用程序时,

  • 他们将花费大量时间执行.

  • 当我们将数据从源移动到网络服务器时,网络流量会很大.

要解决这些问题,我们有MapReduce框架.

输入数据

以上数据保存为样本.txt 并作为输入提供.输入文件如下所示.

1979 23232432425262626262526 25
1980 262728282830313131303030 29
1981 313232323334353636343434 34
1984 393839393941424340393838 40
1985 383939393941414100403939 45

示例程序

示例数据的以下程序使用MapReduce框架.

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable,  /*Input key Type */
   Text,                   /*Input value Type*/
   Text,                   /*Output key Type*/
   IntWritable>            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
	
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
	
   //Main function
	
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
		
      conf.setJobName("max_eletricityunits");
		
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
		
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
		
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
		
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
      JobClient.runJob(conf);
   }
}

将上述程序保存到 ProcessUnits.java 中.下面给出了程序的编译和执行.

ProcessUnits程序的编译和执行

让我们假设我们在Hadoop的主目录中user(例如/home/hadoop).

按照下面给出的步骤编译并执行上述程序.

步骤1 : 使用以下命令创建一个目录来存储已编译的java类.

$ mkdir units

第2步 : 下载Hadoop-core-1.2.1.jar,用于编译和执行MapReduce程序.从 mvnrepository.com 下载jar.我们假设下载文件夹是/home/hadoop/.

第3步 : 以下命令用于编译 ProcessUnits.java 程序并为程序创建jar.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
$ jar -cvf units.jar -C units/.

第4步 : 以下命令用于在HDFS中创建输入目录.

$ HADOOP_HOME/bin/hadoop fs -mkdir input_dir

第5步 : 以下命令用于复制HDFS输入目录中名为 sample.txt 的输入文件.

$ HADOOP_HOME/bin/hadoop fs -put/home/hadoop/sample.txt input_dir

第6步 : 以下命令用于验证输入目录中的文件

$ HADOOP_HOME/bin/hadoop fs -ls input_dir/

第7步 : 以下命令用于通过从输入目录获取输入文件来运行Eleunit_max应用程序.

$ HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

等待文件执行一段时间.执行后,输出包含许多输入拆分,Map任务,Reducer任务等.

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
	
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
	
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
	
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
	
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
	
   Input split bytes=208
   Combine input records=5
   Combine output records=5
	
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
	
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
	
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
	
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
	
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

第8步 : 以下命令用于验证输出文件夹中的结果文件.

$ HADOOP_HOME/bin/hadoop fs -ls output_dir/

第9步 : 以下命令用于查看 Part-00000 文件中的输出.此文件由HDFS生成.

$ HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下是MapReduce程序生成的输出 :

198134
198440
198545

第10步 : 以下命令用于将输出文件夹从HDFS复制到本地文件系统.

$ HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir/home/hadoop