Hadoop MapReduce排序使用键减少输出 [英] Hadoop MapReduce sort reduce output using the key

查看:81
本文介绍了Hadoop MapReduce排序使用键减少输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

处有一个map-reduce程序,它计算几个文本文件的单词. 我的目标是使结果出现的次数降序.

down below there is a map-reduce program counting words of several text files. My aim is to have the result in a descending order regarding the amount of appearences.

不幸的是,该程序按字母顺序对输出进行排序.我想要整数值的自然顺序.

Unfortunately the program sorts the output lexicographically by the key. I want a natural order of the integer value.

所以我用job.setSortComparatorClass(IntComparator.class)添加了一个自定义比较器.但这并没有按预期进行.我收到以下异常:

So I added a custom comparator with job.setSortComparatorClass(IntComparator.class). But this doesn't work as expected. I'm getting the following exception:

java.lang.Exception: java.nio.BufferUnderflowException
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: java.nio.BufferUnderflowException
    at java.nio.Buffer.nextGetIndex(Buffer.java:498)
    at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:355)
    at WordCount$IntComparator.compare(WordCount.java:128)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:987)
    at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:100)
    at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:64)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1277)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1174)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:609)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:675)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
    at java.util.concurrent.FutureTask.run(FutureTask.java:166)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:722)

任何帮助将不胜感激! :)

Any help would be appreciated! :)

我在下面列出了整个程序,因为可能有一个我显然不知道的例外原因.如您所见,我正在使用新的mapreduce api(org.apache.hadoop.mapreduce.*).

I've listed the whole program below as there may be a reason for the exception which I obviously don't know. As you can see I am using the new mapreduce api (org.apache.hadoop.mapreduce.*).

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * Counts the words in several text files.
 */
public class WordCount {
  /**
   * Maps lines of text to (word, amount) pairs.
   */
  public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text word = new Text();
    private IntWritable amount = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      String textLine = value.toString();

      StringTokenizer tokenizer = new StringTokenizer(textLine);
      while (tokenizer.hasMoreElements()) {
        word.set((String) tokenizer.nextElement());

        context.write(word, amount);
      }
    }

  }

  /**
   * Reduces (word, amount) pairs to (amount, word) list.
   */
  public static class Reduce extends
      Reducer<Text, IntWritable, IntWritable, Text> {

    private IntWritable amount = new IntWritable();
    private int sum;

    @Override
    protected void reduce(Text key, Iterable<IntWritable> valueList,
        Context context) throws IOException, InterruptedException {
      sum = 0;

      for (IntWritable value : valueList) {
        sum += value.get();
      }

      amount.set(sum);
      context.write(amount, key);
    }
  }

  public static class IntComparator extends WritableComparator {
    public IntComparator() {
      super(IntWritable.class);
    }

    private Integer int1;
    private Integer int2;

    @Override
    public int compare(byte[] raw1, int offset1, int length1, byte[] raw2,
        int offset2, int length2) {
      int1 = ByteBuffer.wrap(raw1, offset1, length1).getInt();
      int2 = ByteBuffer.wrap(raw2, offset2, length2).getInt();

      return int2.compareTo(int1);
    }

  }

  /**
   * Job configuration.
   * 
   * @param args
   * @throws IOException
   * @throws ClassNotFoundException
   * @throws InterruptedException
   */
  public static void main(String[] args) throws IOException,
      ClassNotFoundException, InterruptedException {
    Path inputPath = new Path(args[0]);
    Path outputPath = new Path(args[1]);

    Configuration configuration = new Configuration();
    configuration.addResource(new Path("/etc/hadoop/conf/core-site.xml"));
    Job job = new Job(configuration);
    job.setJobName("WordCount");
    job.setJarByClass(WordCount.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setSortComparatorClass(IntComparator.class);

    FileInputFormat.setInputPaths(job, inputPath);

    FileSystem.get(configuration).delete(outputPath, true);
    FileOutputFormat.setOutputPath(job, outputPath);

    job.waitForCompletion(true);
  }
}

推荐答案

比较器步骤发生在MapperReducer之间,当您在Reducer本身中交换键和值时,该步骤将不起作用.

The comparator step occurs between the Mapper and Reducer, which wont work for you as you swap the key and value around in the Reducer itself.

如果键是IntWritable,默认的WritableComparator通常将处理您的数字顺序,除了它会获得Text键,从而导致字典顺序.

The default WritableComparator would normally handle your numerical ordering if the key was IntWritable, except it's getting a Text key thus resulting in lexicographical ordering.

对于不确定为什么最后的输出没有按您写出的IntWritable键排序的原因,我不确定.也许与TextOutputFormat的工作方式有关?您可能需要更深入地研究TextOutputFormat源代码以获取有关此方面的线索,但简而言之,恐怕在这里设置排序比较器可能对您没有帮助.

As to why exactly the output at the end isn't sorted by your written out IntWritable key, I'm unsure. Perhaps it has something to do with the way TextOutputFormat works? You might have to dig deeper into TextOutputFormat source code for clues on that, but in short, setting the sort comparator probably won't help you here I'm afraid.

这篇关于Hadoop MapReduce排序使用键减少输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆