MapReduce - 分区程序

分区程序的作用类似于处理输入数据集的条件.分区阶段发生在Map阶段之后和Reduce阶段之前.

分区器的数量等于reducer的数量.这意味着分区器将根据reducer的数量来划分数据.因此,从单个分区程序传递的数据由单个Reducer处理.

分区程序

分区程序对中间映射的键值对进行分区-outputs.它使用用户定义的条件对数据进行分区,该条件类似于散列函数.分区总数与作业的Reducer任务数相同.让我们举个例子来理解分区器是如何工作的.

MapReduce分区器实现

为了方便起见,我们假设我们有一个小的表名为Employee的表,包含以下数据.我们将使用此示例数据作为输入数据集来演示分区程序的工作原理.

Id姓名年龄性别薪水
1201gopal45男性50,000
1202manisha40女性50,000
1203khalil34男性30,000
1204prasanth30男性30,000
1205kiran2040,000
1206laxmi25女性35,000
1207bhavya2015,000
1208reshma19女性15,000
1209kranthi2222,000
1210Satish24男性25,000
1211Krishna2525,000
1212Arshad2820,000
1213lavanya188,000

我们必须编写一个应用程序来处理输入数据集,以便在不同年龄组中按性别查找受薪最高的员工(例如,低于20,在21到21之间) 30,ab ove 30).

输入数据

上述数据在input.txt  hadoop/hadoopPartitioner"目录并作为输入.

1201gopal45男性50000
1202manisha4051000
1203khaleel34男性30000
1204prasanth3031000
1205kiran20男性40000
1206laxmi25女性35000
1207bhavya2015000
1208reshma19女性14000
1209kranthi2222000
1210Satish24男性25000
1211Krishna25男性26000
1212Arshad28男性> 20000
1213lavanya18女性8000

根据给定的输入,以下是程序的算法说明.

地图任务

当我们在文本文件中包含文本数据时,地图任务接受键值对作为输入.此地图任务的输入如下 :

输入 : 关键是一个模式,如"任何特殊的键&加号; filename + 行号"(例如:key = @ input1),该值将是该行中的数据(例如:value = 1201 \t gopal \t 45 \t Male \t 50000).

方法 : 此地图任务的操作如下 :

  • 读取(记录数据) ),作为字符串中参数列表的输入值.

  • 使用split函数,将性别和存储分隔为字符串变量.

String[] str = value.toString().split("\t", -3);
String gender=str[3];

  • 将性别信息和记录数据发送为从地图任务输出键值对到分区任务.

 context.write(new Text(gender), new Text(value));

  • 对文本文件中的所有记录重复上述所有步骤.

输出 : 您将获得性别数据和记录数据值作为键值对.

分区程序任务

分区程序任务接受键值对从地图任务作为其输入.分区意味着将数据划分为段.根据给定的分区条件标准,输入的键值配对数据可以根据年龄标准分为三个部分.

输入 : 键值对集合中的整个数据.

key =记录中的性别字段值.

value =该性别的整个记录数据值.

方法 : 分区逻辑的过程如下:

  • 从输入键值对中读取年龄字段值.

String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);

  • 使用以下条件检查年龄值.

    • 年龄小于或等于20

    • 年龄大于20且小于或等于30.

    • 年龄大于30岁.

if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

输出 : 键值对的整个数据被分段为三个键值对集合. Reducer在每个集合上单独工作.

减少任务

分区任务的数量等于减速器任务的数量.这里我们有三个分区任务,因此我们有三个Reducer任务要执行.

输入 :  Reducer将使用不同的键值对集合执行三次.

key =记录中的性别字段值.

value =该性别的整个记录数据.

方法 : 以下逻辑将应用于每个集合.

  • 读取每条记录的Salary字段值.

String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.

  • 使用max变量检查薪水.如果str [4]是最高工资,则将str [4]指定为max,否则跳过该步骤.

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}

  • 为每个密钥集合重复步骤1和2(男性和放大器;女性是关键的收藏品).执行这三个步骤后,您将从男性密钥集合中找到一个最高工资,从女性密钥集合中找到一个最高工资.

context.write(new Text(key),new IntWritable(max));

输出 : 最后,您将在三个不同年龄组的集合中获得一组键值对数据.它包含Male集合的最高工资和每个年龄组中Female集合的最高工资.

执行Map,Partitioner和Reduce任务后,三个集合键值对数据存储在三个不同的文件中作为输出.

所有这三个任务都被视为MapReduce作业.这些工作的以下要求和规格应在配置和减号中指定;

  • 工作名称

  • 键和值的输入和输出格式

  • Map,Reduce和Partitioner任务的各个类

Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

示例程序

以下程序显示如何在MapReduce程序中为给定条件实现分区程序./p>

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

将上述代码保存为"/home/hadoop/"中的 PartitionerExample.java hadoopPartitioner".下面给出了程序的编译和执行.

编译和执行

让我们假设我们在Hadoop用户的主目录中(例如,/home/hadoop).

按照下面给出的步骤编译并执行上述程序.

步骤1 : 下载Hadoop-core-1.2.1.jar,用于编译和执行MapReduce程序.您可以从 mvnrepository.com 下载jar.

让我们假设下载的文件夹是"/home/hadoop/hadoopPartitioner"

第2步 : 以下命令用于编译程序 PartitionerExample.java 并为程序创建jar.

 
 $ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java 
 $ jar -cvf PartitionerExample.jar -C.

第3步 : 使用以下命令在HDFS中创建输入目录.

 
 $ HADOOP_HOME/bin/hadoop fs -mkdir input_dir

第4步 : 使用以下命令在HDFS的输入目录中复制名为 input.txt 的输入文件.

 
 $ HADOOP_HOME/bin/hadoop fs -put/home/hadoop/hadoopPartitioner/input.txt input_dir

第5步 : 使用以下命令验证输入目录中的文件.

 
 $ HADOOP_HOME/bin/hadoop fs -ls input_dir/

第6步 : 使用以下命令通过从输入目录中获取输入文件来运行Top salary应用程序.

 
 $ HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

等待文件执行一段时间.执行后,输出包含许多输入拆分,映射任务和Reducer任务.

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

第7步 : 使用以下命令验证输出文件夹中的结果文件.

 
 $ HADOOP_HOME/bin/hadoop fs -ls output_dir/

您将在三个文件中找到输出,因为您在程序中使用了三个分区器和三个Reducers.

步骤8 : 使用以下命令查看 Part-00000 文件中的输出.此文件由HDFS生成.

 
 $ HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

零件00000中的输出

Female   15000
Male     40000

使用以下命令查看 Part-00001 文件中的输出.

 
 $ HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

部分输出 - 00001

Female   35000
Male    31000

使用以下命令查看 Part-00002 文件中的输出.

 
 $ HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Part-00002中的输出

Female  51000
Male   50000