CombineFileInputFormat始终只启动一个映射Hadoop 1.2.1 [英] CombineFileInputFormat launches only one map always Hadoop 1.2.1

查看:88
本文介绍了CombineFileInputFormat始终只启动一个映射Hadoop 1.2.1的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图用CombineFileInputFormat测试来处理每个8 MB的小文件(20个文件)。我遵循此博客中给出的示例。我能够实施和测试它。最终结果是正确的。但令我感到意外的是,它总是以一张地图结束。我尝试设置属性mapred.max.split.size各种值,如16MB,32MB等(当然以字节为单位),没有任何成功。还有什么我需要做的或者它是正确的行为吗?

我正在运行一个默认复制为2的双节点集群。下面是开发的代码。任何帮助非常感谢。

  package inverika.test.retail; 

导入org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;

public class CategoryCount {

public static class CategoryMapper
extends Mapper< LongWritable,Text,Text,IntWritable> {

private static static IntWritable one = new IntWritable(1);
private String [] columns = new String [8];
$ b $ @Override
public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException {
columns = value.toString()。split( );
context.write(new Text(columns [4]),one);
}
}

public static class CategoryReducer
extends Reducer<文字,IntWritable,文字,IntWritable> {

@Override
public void reduce(Text key,Iterable< IntWritable> values,Context context)
throws IOException,InterruptedException {

int sum = 0; $(intWritable value:values)

{
sum + = value.get();
}
context.write(key,new IntWritable(sum));



public static void main(String args [])throws Exception {
if(args.length!= 2){
System .err.println(Usage:CategoryCount< input Path>< output Path>);
System.exit(-1);
}

配置conf = new Configuration();
conf.set(mapred.textoutputformat.separator,,);
conf.set(mapred.max.split.size,16777216); // 16 MB

工作职位=新工作(conf,Retail Category Count);
job.setJarByClass(CategoryCount.class);
job.setMapperClass(CategoryMapper.class);
job.setReducerClass(CategoryReducer.class);
job.setInputFormatClass(CombinedInputFormat.class);
//CombineFileInputFormat.setMaxInputSplitSize(job,16777216);
CombinedInputFormat.setMaxInputSplitSize(job,16777216);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job,new Path(args [0]));
FileOutputFormat.setOutputPath(job,new Path(args [1]));
//job.submit();
//System.exit(job.waitForCompletion(false)?0:1);
System.exit(job.waitForCompletion(true)?0:1);




$ b

这里是CombinedFileInputFormat实现的

  package inverika.test.retail; 

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;

公共类CombinedInputFormat扩展了CombineFileInputFormat< LongWritable,Text> {

@Override
public RecordReader< LongWritable,Text>
createRecordReader(InputSplit split,TaskAttemptContext上下文)
抛出IOException {

CombineFileRecordReader< LongWritable,Text> reader =
CombineFileRecordReader< LongWritable,Text>(
(CombineFileSplit)split,context,myCombineFileRecordReader.class);
回报阅读器;
}

public static class myCombineFileRecordReader extends RecordReader< LongWritable,Text> {
专用LineRecordReader lineRecordReader = new LineRecordReader();

public myCombineFileRecordReader(CombineFileSplit split,
TaskAttemptContext context,Integer index)抛出IOException {

FileSplit fileSplit = new FileSplit(split.getPath(index),
split.getOffset(index),
split.getLength(index),
split.getLocations());
lineRecordReader.initialize(fileSplit,context);

$ b @Override
public void initialize(InputSplit inputSplit,TaskAttemptContext context)
throws IOException,InterruptedException {
//linerecordReader.initialize(inputSplit,上下文);
}

@Override
public void close()throws IOException {
lineRecordReader.close();

$ b @Override
public float getProgress()throws IOException {
return lineRecordReader.getProgress();

$ b @Override
public LongWritable getCurrentKey()抛出IOException,
InterruptedException {
return lineRecordReader.getCurrentKey();
}

@Override
public Text getCurrentValue()throws IOException,InterruptedException {
return lineRecordReader.getCurrentValue();

$ b @Override
public boolean nextKeyValue()throws IOException,InterruptedException {
return lineRecordReader.nextKeyValue();




解决方案

使用 CombineFileInputFormat 作为输入格式类时,需要设置最大拆分大小。或者,当所有块来自同一个机架时,您可能会完全使用一个映射器。



您可以通过以下其中一种方式实现此目的: CombineFileInputFormat.setMaxSplitSize() 方法

  • set mapreduce.input.fileinputformat。 split.maxsize mapred.max.split.size (不建议使用)配置参数

    例如,通过发出以下调用:

      job.getConfiguration()。setLong(mapreduce.input.fileinputformat.split.maxsize ,(long)(256 * 1024 * 1024)); 

    您将最大拆分大小设置为256MB。



    参考:

    ul>

  • https://hadoop.apache.org/docs/r2.2.0/api/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.html

  • http://mail-archives.apache.org/mod_mbox/hadoop-common-user/201004.mbox/%3C35374.30384.qm@web63402.mail.re1.yahoo.com%3E


  • I am trying to use test CombineFileInputFormat to process few small files (20 files) of 8 MB each. I followed the sample given in this blog. I am able to implement and test it. The end result is correct. But what is surprising to me is that it is always ending up with only one map. I tried setting the attribute "mapred.max.split.size" various values like 16MB, 32MB etc (Of course in bytes) without any success. Is there anything else I need to do or is it the right behavior?

    I am running a two node cluster with default replication as 2. Given below is the code that developed. Any help is highly appreciated.

    package inverika.test.retail;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapreduce.Mapper;
    import java.io.IOException;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class CategoryCount {
    
        public static class CategoryMapper
            extends Mapper<LongWritable, Text, Text, IntWritable>    {
    
            private final static IntWritable one = new IntWritable(1);
            private String[] columns = new String[8];
    
            @Override
            public void map(LongWritable key, Text value, Context context)
                    throws     IOException, InterruptedException {
                columns = value.toString().split(",");  
                context.write(new Text(columns[4]), one);
            }
        }
    
        public static class CategoryReducer
            extends Reducer< Text, IntWritable, Text, IntWritable>    {
    
            @Override
            public void reduce(Text key, Iterable<IntWritable>  values, Context context)
                    throws     IOException, InterruptedException {
    
                    int sum = 0;
    
                    for (IntWritable value :  values) {
                            sum += value.get();
                    }
                   context.write(key, new IntWritable(sum));
            }
        }
    
        public static void main(String args[]) throws Exception    {
            if (args.length != 2)  {
                    System.err.println("Usage: CategoryCount <input Path> <output Path>");
                    System.exit(-1);
            } 
    
            Configuration conf = new Configuration();
            conf.set("mapred.textoutputformat.separator", ",");
            conf.set("mapred.max.split.size", "16777216");   // 16 MB
    
            Job job = new Job(conf, "Retail Category Count");
            job.setJarByClass(CategoryCount.class);
            job.setMapperClass(CategoryMapper.class);
            job.setReducerClass(CategoryReducer.class);
            job.setInputFormatClass(CombinedInputFormat.class);
            //CombineFileInputFormat.setMaxInputSplitSize(job, 16777216);
            CombinedInputFormat.setMaxInputSplitSize(job, 16777216);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            job.setOutputFormatClass(TextOutputFormat.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]) );
            FileOutputFormat.setOutputPath(job, new Path(args[1]) );
            //job.submit();
            //System.exit(job.waitForCompletion(false) ?  0 : 1);
            System.exit(job.waitForCompletion(true) ?  0 : 1);
        }
    }
    

    Here is the CombinedFileInputFormat implemented

    package inverika.test.retail;
    
    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
    
    public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {
    
        @Override
        public RecordReader<LongWritable, Text>
                createRecordReader(InputSplit split, TaskAttemptContext context)
                        throws IOException {
    
            CombineFileRecordReader<LongWritable, Text> reader = 
                    new CombineFileRecordReader<LongWritable, Text>(
                            (CombineFileSplit) split, context, myCombineFileRecordReader.class);        
            return reader;
        }
    
        public static class myCombineFileRecordReader extends RecordReader<LongWritable, Text> {
            private LineRecordReader lineRecordReader = new LineRecordReader();
    
            public myCombineFileRecordReader(CombineFileSplit split, 
                    TaskAttemptContext context, Integer index) throws IOException {
    
                FileSplit fileSplit = new FileSplit(split.getPath(index), 
                                                    split.getOffset(index),
                                                    split.getLength(index), 
                                                    split.getLocations());
                lineRecordReader.initialize(fileSplit, context);
            }
    
            @Override
            public void initialize(InputSplit inputSplit, TaskAttemptContext context)
                    throws IOException, InterruptedException {
                //linerecordReader.initialize(inputSplit, context);
            }
    
            @Override
            public void close() throws IOException {
                lineRecordReader.close();
            }
    
            @Override
            public float getProgress() throws IOException {
                return lineRecordReader.getProgress();
            }
    
            @Override
            public LongWritable getCurrentKey() throws IOException,
                    InterruptedException {
                return lineRecordReader.getCurrentKey();
            }
    
            @Override
            public Text getCurrentValue() throws IOException, InterruptedException {
                return lineRecordReader.getCurrentValue();
            }
    
            @Override
            public boolean nextKeyValue() throws IOException, InterruptedException {
                return lineRecordReader.nextKeyValue();
            }        
        }
    }
    

    解决方案

    You need to set the maximum split size when using the CombineFileInputFormat as the input format class. Or you would probably get exactly ONLY ONE mapper when all blocks come from the same rack.

    You can achieve this in one of the following ways:

    • call the CombineFileInputFormat.setMaxSplitSize() method
    • set mapreduce.input.fileinputformat.split.maxsize or mapred.max.split.size(deprecated) configuration parameter
      For exmaple, by issuing the following call

      job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", (long)(256*1024*1024));
      

      you are setting the maximum split size to 256MB.


    reference:

    这篇关于CombineFileInputFormat始终只启动一个映射Hadoop 1.2.1的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆