如何覆盖Hadoop的默认排序 [英] How to override the default sorting of Hadoop

查看:113
本文介绍了如何覆盖Hadoop的默认排序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 map-reduce 作业,其中的键是从1到200之间的数字。我的预期输出是按照编号顺序的(数量,值)
但是我得到的结果是:

  1 value 
10 value
11价值


2价值
20价值


3价值

我知道这是由于Map-Reduce按升序对键进行排序的默认行为。

我希望我的密钥只按数字顺序排序。我怎样才能做到这一点?

解决方案

MapReduce框架中的默认 WritableComparator 如果密钥是 IntWritable ,通常会处理您的数字排序。我怀疑它得到了一个 Text 键,从而导致了你的情况下的字典排序。请查看使用 IntWritable 键的示例代码以发送值:

1) Mapper Implementaion

  package com.stackoverflow.answers.mapreduce; 

import java.io.IOException;

导入org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

公共类SourceFileMapper扩展了Mapper< LongWritable,Text,IntWritable,Text> {

private static final String DEFAULT_DELIMITER =\t;

private IntWritable keyToEmit = new IntWritable();
private text valueToEmit = new Text();

public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException {
String line = value.toString();
keyToEmit.set(Integer.parseInt(line.split(DEFAULT_DELIMITER)[0]));
valueToEmit.set(line.split(DEFAULT_DELIMITER)[1]);
context.write(keyToEmit,valueToEmit);
}

}

2)Reducer实现

  package com.stackoverflow.answers.mapreduce; 

import java.io.IOException;

导入org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

公共类SourceFileReducer扩展了Reducer< IntWritable,Text,IntWritable,Text> {

public void reduce(IntWritable key,Iterable< Text> values,Context context)throws IOException,
InterruptedException {
for(Text value:values){
context.write(key,value);





$ b

3)驱动程序实现

  package com.stackoverflow.answers.mapreduce; 

import java.io.IOException;

导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.fs.FileSystem;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SourceFileDriver {

public static void main(String [] args)throws IOException,InterruptedException,ClassNotFoundException {

Path inputPath = new Path (参数[0]);
Path outputDir = new Path(args [1]);

//创建配置
配置conf = new配置(true);

//创建作业
作业作业=新作业(conf,SourceFileDriver);
job.setJarByClass(SourceFileDriver.class);

//安装MapReduce
job.setMapperClass(SourceFileMapper.class);
job.setReducerClass(SourceFileReducer.class);
job.setNumReduceTasks(1);

//指定键/值
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);

//输入
FileInputFormat.addInputPath(job,inputPath);
job.setInputFormatClass(TextInputFormat.class);

//输出
FileOutputFormat.setOutputPath(job,outputDir);
job.setOutputFormatClass(TextOutputFormat.class);

//如果存在则删除输出
FileSystem hdfs = FileSystem.get(conf);
if(hdfs.exists(outputDir))
hdfs.delete(outputDir,true);

//执行作业
int code = job.waitForCompletion(true)? 0:1;
System.exit(code);

}

}

谢谢!

I have a map-reduce job in which the keys are numbers from 1-200. My intended output was (number,value) in the number order. But I'm getting the output as :

1    value
10   value
11   value
   :
   : 
2    value
20   value
   :
   :
3    value

I know this is due to the default behavior of Map-Reduce to sort keys in ascending order.

I want my keys to be sorted in numerical order only. How can I achieve this?

解决方案

The default WritableComparator in MapReduce framework would normally handle your numerical ordering if the key was IntWritable. I suspect it's getting a Text key thus resulting in lexicographical ordering in your case. Please have a look at the sample code which uses IntWritable key to emit the values:

1) Mapper Implementaion

package com.stackoverflow.answers.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SourceFileMapper extends Mapper<LongWritable, Text, IntWritable, Text> {

    private static final String DEFAULT_DELIMITER = "\t";

    private IntWritable keyToEmit = new IntWritable();
    private Text valueToEmit = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        keyToEmit.set(Integer.parseInt(line.split(DEFAULT_DELIMITER)[0]));
        valueToEmit.set(line.split(DEFAULT_DELIMITER)[1]);
        context.write(keyToEmit, valueToEmit);
    }

}

2) Reducer Implementation

package com.stackoverflow.answers.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SourceFileReducer extends Reducer<IntWritable, Text, IntWritable, Text> {

    public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException,
            InterruptedException {
        for (Text value : values) {
            context.write(key, value);
        }
    }

}

3) Driver Implementation

package com.stackoverflow.answers.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SourceFileDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        Path inputPath = new Path(args[0]);
        Path outputDir = new Path(args[1]);

        // Create configuration
        Configuration conf = new Configuration(true);

        // Create job
        Job job = new Job(conf, "SourceFileDriver");
        job.setJarByClass(SourceFileDriver.class);

        // Setup MapReduce
        job.setMapperClass(SourceFileMapper.class);
        job.setReducerClass(SourceFileReducer.class);
        job.setNumReduceTasks(1);

        // Specify key / value
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);

        // Input
        FileInputFormat.addInputPath(job, inputPath);
        job.setInputFormatClass(TextInputFormat.class);

        // Output
        FileOutputFormat.setOutputPath(job, outputDir);
        job.setOutputFormatClass(TextOutputFormat.class);

        // Delete output if exists
        FileSystem hdfs = FileSystem.get(conf);
        if (hdfs.exists(outputDir))
            hdfs.delete(outputDir, true);

        // Execute job
        int code = job.waitForCompletion(true) ? 0 : 1;
        System.exit(code);

    }

}

Thank you!

这篇关于如何覆盖Hadoop的默认排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆