第一次使用Hadoop,MapReduce作业不运行Reduce阶段 [英] Using Hadoop for the First Time, MapReduce Job does not run Reduce Phase

查看:104
本文介绍了第一次使用Hadoop,MapReduce作业不运行Reduce阶段的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我写了一个简单的map reduce作业,可以从DFS读入数据并在其上运行一个简单的算法。当试图调试它时,我决定简单地让映射器输出一组键和值,而减速器输出一组完全不同的值。我正在单个节点Hadoop 20.2群集上运行此作业。当作业完成时,输出包含简单地由映射器输出的值,这导致我相信减速器未运行。如果有人对我的代码为什么产生这样的输出提供任何见解,我将不胜感激。我已经尝试将outputKeyClass和outputValueClass设置为不同的东西,以及将setMapOutputKeyClass和setMapOutputValueClass设置为不同的东西。目前,我们的代码段是我正在运行的算法,但我已经更改了地图并减少了简单输出某些值的方法。再次,作业的输出仅包含由映射器输出的值。这里是我用来运行这个工作的类:



import java.io.IOException;
import java.util。*;



import org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/ **
*
* @author redbeard
* /
public class CalculateHistogram {

  public static class HistogramMap扩展Mapper< LongWritable,Text,LongWritable,Text> {

private static final int R = 100;
private int n = 0;

@Override $ b $ public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException {
if(n == 0){
StringTokenizer tokens = new StringTokenizer(value.toString(),,);
int counter = 0;
while(tokens.hasMoreTokens()){
String token = tokens.nextToken();
if(tokens.hasMoreTokens()){
context.write(new LongWritable(-2),new Text(HI));
//context.write(new LongWritable(counter),new Text(token));
}
counter ++;
n ++;
}
} else {
n ++;
if(n == R){
n = 0;

$ b}



public static class HistogramReduce扩展了Reducer< LongWritable,Text,LongWritable,HistogramBucket> {

private final static int R = 10;
$ b public void reduce(LongWritable key,Iterator< Text> values,Context context)
throws IOException,InterruptedException {
if(key.toString()。equals( - 1 )){
//context.write(key,new HistogramBucket(key));
}
Text t = values.next(); ($!
for(char c:t.toString()。toCharArray()){
if(!Character.isDigit(c)&& c!='。'){
/ /context.write(key,new HistogramBucket(key)); //如果这不是数字属性,我们忽略它

$ b $ context.setStatus(Building Histogram);
HistogramBucket i =新的HistogramBucket(键);
i.add(new DoubleWritable(Double.parseDouble(t.toString()))); (int j = 0; j t = values.next();
while(values.hasNext()){


if(!i.contains(Double.parseDouble(t.toString()))){
context.setStatus(将值写入柱状图);
i.add(new DoubleWritable(Double.parseDouble(t.toString())));



context.write(new LongWritable(55555555),new HistogramBucket(new LongWritable(55555555)));



public static void main(String [] args)throws Exception {
Configuration conf = new Configuration();
String [] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length!= 2){
System.err.println(Usage:wordcount< in>< out>);
System.exit(2);
}

工作作业=新作业(conf,MRDT - 生成直方图);
job.setJarByClass(CalculateHistogram.class);
job.setMapperClass(HistogramMap.class);
job.setReducerClass(HistogramReduce.class);

//job.setOutputValueClass(HistogramBucket.class);

//job.setMapOutputKeyClass(LongWritable.class);
//job.setMapOutputValueClass(Text.class);

FileInputFormat.addInputPath(job,new Path(otherArgs [0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs [1]));

System.exit(job.waitForCompletion(true)?0:1);
}

}

解决方案

您的reduce方法的签名是错误的。您的方法签名包含Iterator< Text>。您必须通过Iterable< Text>。



您的代码不会覆盖Reducer基类的reduce方法。因此,使用由Reducer基类提供的默认实例。这个实现是一个标识函数。



使用@Override注释来预测这样的错误。


I wrote a simple map reduce job that would read in data from the DFS and run a simple algorithm on it. When trying to debug it I decided to simply make the mappers output a single set of keys and values, and the reducers output an entirely different set. I am running this job on a single node Hadoop 20.2 cluster. When the job is finished the output contains simply the values that were outputted by the mappers leading me to believe that the reducer is not being run. I would greatly appreciate it if anyone provide any insight as to why my code is producing such output. I have tried setting the outputKeyClass and outputValueClass to different things as well as the setMapOutputKeyClass and setMapOutputValueClass to different things. Currently the commented our sections of code are the algorithm that I am running, but I have changed the map and reduce methods to simply output certain values. Once again, the output from the job contains only the values that were outputted by the mapper. Here is the class I used to run the job:

import java.io.IOException; import java.util.*;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;

/** * * @author redbeard */ public class CalculateHistogram {

public static class HistogramMap extends Mapper<LongWritable, Text, LongWritable, Text> {

    private static final int R = 100;
    private int n = 0;

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        if (n == 0) {
            StringTokenizer tokens = new StringTokenizer(value.toString(), ",");
            int counter = 0;
            while (tokens.hasMoreTokens()) {
                String token = tokens.nextToken();
                if (tokens.hasMoreTokens()) {
                    context.write(new LongWritable(-2), new Text("HI"));
                    //context.write(new LongWritable(counter), new Text(token));
                }
                counter++;
                n++;
            }
        } else {
            n++;
            if (n == R) {
                n = 0;
            }

        }
    }
}

public static class HistogramReduce extends Reducer<LongWritable, Text, LongWritable, HistogramBucket> {

    private final static int R = 10;

    public void reduce(LongWritable key, Iterator<Text> values, Context context)
                                        throws IOException, InterruptedException {
        if (key.toString().equals("-1")) {
            //context.write(key, new HistogramBucket(key));
        }
        Text t = values.next();
        for (char c : t.toString().toCharArray()) {
            if (!Character.isDigit(c) && c != '.') {
                //context.write(key, new HistogramBucket(key));//if this isnt a numerical attribute we ignore it
            }
        }
        context.setStatus("Building Histogram");
        HistogramBucket i = new HistogramBucket(key);
        i.add(new DoubleWritable(Double.parseDouble(t.toString())));
        while (values.hasNext()) {
            for (int j = 0; j < R; j++) {
                t = values.next();
            }
            if (!i.contains(Double.parseDouble(t.toString()))) {
                context.setStatus("Writing a value to the Histogram");
                i.add(new DoubleWritable(Double.parseDouble(t.toString())));
            }
        }

        context.write(new LongWritable(55555555), new HistogramBucket(new LongWritable(55555555)));
    }
}

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
        System.err.println("Usage: wordcount <in> <out>");
        System.exit(2);
    }

    Job job = new Job(conf, "MRDT - Generate Histogram");
    job.setJarByClass(CalculateHistogram.class);
    job.setMapperClass(HistogramMap.class);
    job.setReducerClass(HistogramReduce.class);

    //job.setOutputValueClass(HistogramBucket.class);

    //job.setMapOutputKeyClass(LongWritable.class);
    //job.setMapOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

解决方案

the signature of your reduce method is wrong. Your method signature contains "Iterator<Text>". You have to pass an "Iterable<Text>".

Your code does not override the reduce method of the Reducer base class. Because of this, the default imlementation provided by the Reducer base class is used. This implementation is an identity function.

Use the @Override annotation to anticipate errors like this one.

这篇关于第一次使用Hadoop,MapReduce作业不运行Reduce阶段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆