输出文件包含映射器输出而不是Reducer输出 [英] Output file contains Mapper Output instead of Reducer output

查看:93
本文介绍了输出文件包含映射器输出而不是Reducer输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

您好,我正尝试在独立模式下使用地图缩小技术来查找少数数字的平均值。我有两个输入文件。它包含值file1: 25 25 25 25 25 和file2: 15 15 15 15 15

我的程序工作正常,但输出文件包含mapper的输出而不是reducer输出。



以下是我的代码:

  import java.io.IOException ; 
import java.util.StringTokenizer;
导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Writable;
import java.io. *;


public class Average {

public static class SumCount implements Writable {

public int sum;
public int count;

@Override
public void write(DataOutput out)throws IOException {
out.writeInt(sum);
out.writeInt(count);
}

@Override
public void readFields(DataInput in)throws IOException {
sum = in.readInt();
count = in.readInt();

$ b $ public static class TokenizerMapper extends Mapper< Object,Text,Text,Object> {

private final int IntWritable valueofkey = new IntWritable();
私人文字=新文字();
SumCount sc = new SumCount();
public void map(Object key,Text value,Context context
)throws IOException,InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
int sum = 0;
int count = 0;
int v;
while(itr.hasMoreTokens()){
word.set(itr.nextToken());
v = Integer.parseInt(word.toString());
count = count + 1;
sum = sum + v;
}
word.set(average);
sc.sum = sum;
sc.count = count;
context.write(word,sc);
}
}

public static class IntSumReducer extends Reducer< Text,Object,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key,Iterable< SumCount> values,Context context)throws IOException,InterruptedException {
int sum = 0;
int count = 0;
int wholesum = 0;
int wholecount = 0;
for(SumCount val:values){
wholesum = wholesum + val.sum;
wholecount = wholecount + val.count;
}
int res = wholesum / wholecount;
result.set(res);
context.write(key,result);



public static void main(String [] args)throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,);
job.setJarByClass(Average.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(SumCount.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path(args [0]));
FileOutputFormat.setOutputPath(job,new Path(args [1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}

运行程序后,我的输出文件就像这样:

 平均值$ SumCount @ 434ba039 
平均值$ SumCount @ 434ba039
IntSumReducer


解决方案作为一个组合器。合并器必须接收并发出相同的键/值类型。



所以我会删除 job.setCombinerClass(IntSumReducer.class);

记住联合收割机的输出是reduce的输入,所以写出 Text IntWritable 不会起作用。



如果您的输出文件看起来像 part-m -xxxxx 那么上面的问题可能意味着它只运行Map阶段并停止。您的计数器会证实这一点。



您还有 Reducer<文本,对象,文本,IntWritable> Reducer< Text,SumCount,Text,IntWritable>


Hi I am trying to find average of few numbers using map reduce technique in stand alone mode. I have two input files.It contain values file1: 25 25 25 25 25 and file2: 15 15 15 15 15.

My program is working fine but the output file contains output of the mapper instead of reducer output.

Here is my code :

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Writable;
import java.io.*;


public class Average {

    public static class SumCount implements Writable {

        public int sum;
        public int count;

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(sum);
            out.writeInt(count);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            sum = in.readInt();
            count =in.readInt();
        }  
    }
  public static class TokenizerMapper extends Mapper<Object, Text, Text, Object>{

    private final static IntWritable valueofkey = new IntWritable();
    private Text word = new Text();
    SumCount sc=new SumCount();
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      int sum=0;
      int count=0;
      int v;
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        v=Integer.parseInt(word.toString());
        count=count+1;
        sum=sum+v;       
      }
      word.set("average");
      sc.sum=sum;
      sc.count=count;
      context.write(word,sc);
    }
  }

  public static class IntSumReducer extends Reducer<Text,Object,Text,IntWritable> {
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<SumCount> values,Context context) throws IOException, InterruptedException {
      int sum = 0;
      int count=0;
      int wholesum=0;
      int wholecount=0;
      for (SumCount val : values) {
        wholesum=wholesum+val.sum;
        wholecount=wholecount+val.count;
      }
      int res=wholesum/wholecount;
      result.set(res);
      context.write(key, result );
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "");
    job.setJarByClass(Average.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(SumCount.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

after i run the program my output file is like this:

average Average$SumCount@434ba039
average Average$SumCount@434ba039

解决方案

You can't use your Reducer class IntSumReducer as a combiner. A combiner must receive and emit the same Key/Value types.

So i would remove job.setCombinerClass(IntSumReducer.class);.

Remember the output from the combine is the input to the reduce, so writing out Text and IntWritable isnt going to work.

If your output files looked like part-m-xxxxx then the above issue could mean it only ran the Map phase and stoppped. Your counters would confirm this.

You also have Reducer<Text,Object,Text,IntWritable> which should be Reducer<Text,SumCount,Text,IntWritable>.

这篇关于输出文件包含映射器输出而不是Reducer输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆