如何以高效的方式将大数据集划分为Hadoop中的多个小文件 [英] How to divide a big dataset into multiple small files in Hadoop in an efficient way

查看:104
本文介绍了如何以高效的方式将大数据集划分为Hadoop中的多个小文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个大数据集,每个文件包含1M个记录,我想将它分成一些文件,每个文件在Hadoop中有1000个记录。我正在研究实现此目标的不同场景。一种是将分割大小设置得很小,这样每个映射器只需要几条记录(约1000条记录),然后输出它们。这需要运行许多效率不高的映射器。另一种解决方案是考虑一个reducer并将所有记录发送给它,并在那里进行拆分。这对映射精简也是反直觉的,因为所有的工作都只由一个节点完成。什么是将这些数据集分成小文件的有效替代方法?

解决方案

您可以使用NLineInputFormat来指定应该有多少记录作为mappers的输入。



将属性'mapreduce.input.lineinputformat.linespermap'设置为1000的倍数,以便生成合理数量的映射器。在映射器中,使用多个输出来编写每个1000记录使用计数器增量逻辑分离文件。

使用多个输出将数据拆分为1000个记录(对于文本文件)的示例代码

  import java.io.IOException; 
import java.util.ArrayList;
import java.util.List;

导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.fs.FileSystem;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class DataSplitter {

public static class Map扩展Mapper< LongWritable,Text,NullWritable,Text> {

private text outputValue = new Text();

@SuppressWarnings(rawtypes)
私有MultipleOutputs multipleOutputs;

private int fileCounter = 1;

私人列表< String> recordList = new ArrayList< String>();
$ b @SuppressWarnings({rawtypes,unchecked})
@Override
保护无效设置(Mapper< LongWritable,Text,NullWritable,Text> .Context上下文)throws IOException,InterruptedException {

multipleOutputs = new MultipleOutputs(context);


$ b @SuppressWarnings(unchecked)$ b $ public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException {
String line = value.toString();

recordList.add(line); $(b)
$ b if(recordList.size()== 1000){

for(int i = 0; i< recordList.size(); i ++){

outputValue.set(recordList.get(i));

multipleOutputs.write(mos,NullWritable.get(),outputValue,output-+ fileCounter);

}

fileCounter ++;

recordList.clear();
}
$ b}

@Override
保护无效清理(Mapper< LongWritable,Text,NullWritable,Text> .Context上下文)throws IOException,InterruptedException {

multipleOutputs.close(); ();

if(!recordList.isEmpty()){

for(int i = 0; i< recordList.size(); i ++){

outputValue.set(recordList.get(i));

context.write(NullWritable.get(),outputValue);

}
recordList.clear();





public static class Reduce extends Reducer< LongWritable,Text,NullWritable,Text> {

private text outputValue = new Text();

@SuppressWarnings(rawtypes)
私有MultipleOutputs multipleOutputs;

private int fileCounter = 1;

私人列表< String> recordList = new ArrayList< String>();
$ b @SuppressWarnings({unchecked,rawtypes})
@Override
保护void setup(Reducer< LongWritable,Text,NullWritable,Text> .Context context)throws IOException,InterruptedException {
// TODO自动生成的方法存根
multipleOutputs = new MultipleOutputs(context);

$ b @SuppressWarnings(unchecked)
public void reduce(NullWritable key,Iterable< Text> values,Context context)throws IOException,InterruptedException {

for(Text value:values){

String line = value.toString();

recordList.add(line); $(b)
$ b if(recordList.size()== 1000){

for(int i = 0; i< recordList.size(); i ++){

outputValue.set(recordList.get(i));

multipleOutputs.write(mos,NullWritable.get(),outputValue,output-+ fileCounter);

}
fileCounter ++;
recordList.clear(); ();
}

if(!recordList.isEmpty()){

for(int i = 0; i< recordList.size(); i ++){

outputValue.set(recordList.get(i));

context.write(NullWritable.get(),outputValue);






覆盖
protected void cleanup(Reducer< LongWritable, Text,NullWritable,Text> .Context context)抛出IOException,InterruptedException {
// TODO自动生成的方法存根
super.cleanup(context);
multipleOutputs.close();



public static void main(String [] args)throws Exception {
Configuration conf = new Configuration();

@SuppressWarnings(deprecation)
Job job = new Job(conf,DataSplitter);
job.setJarByClass(DataSplitter.class);

job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);

job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);

job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileSystem.get(conf).delete(new Path(args [1]),true);

MultipleOutputs.addNamedOutput(job,mos,TextOutputFormat.class,NullWritable.class,Text.class);
FileInputFormat.addInputPath(job,new Path(args [0]));
FileOutputFormat.setOutputPath(job,new Path(args [1]));

System.exit(job.waitForCompletion(true)== true?0:1);
}

}


I have a big data set consisting of files with 1M records each and I'd like to divide it into some files with 1000 records each in Hadoop. I'm investigating different scenarios for achieving this goal. One is to make the split size small so that each mapper takes only a few records (~1000 records) and then output them. This requires running many mappers which is not efficient. The other solution is to consider one reducer and send all the records to it and them do the split there. This is also counter-intuitive to mapreduce as all the job is done by only one node. What is the efficient alternative to split this data sets into small files?

解决方案

You can use NLineInputFormat to specify how many records should be given as input for mappers.

Set the property 'mapreduce.input.lineinputformat.linespermap' to multiples of 1000 so that reasonable number of mappers are spawned.In mapper, Use multiple outputs to write each 1000 records to separate file using counter increment logic.

Sample code for using multiple outputs to split data into 1000 records (for Text files)

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class DataSplitter {

    public static class Map extends Mapper<LongWritable, Text, NullWritable, Text> {

        private Text outputValue = new Text();

        @SuppressWarnings("rawtypes")
        private MultipleOutputs multipleOutputs;

        private int fileCounter = 1;

        private List<String> recordList = new ArrayList<String>();

        @SuppressWarnings({ "rawtypes", "unchecked" })
        @Override
        protected void setup(Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {

            multipleOutputs = new MultipleOutputs(context);

        }

        @SuppressWarnings("unchecked")
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();

            recordList.add(line);

            if (recordList.size() == 1000) {

                for (int i = 0; i < recordList.size(); i++) {

                    outputValue.set(recordList.get(i));

                    multipleOutputs.write("mos", NullWritable.get(), outputValue, "output-" + fileCounter);

                }

                fileCounter++;

                recordList.clear();
            }

        }

        @Override
        protected void cleanup(Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {

            multipleOutputs.close();

            if (!recordList.isEmpty()) {

                for (int i = 0; i < recordList.size(); i++) {

                    outputValue.set(recordList.get(i));

                    context.write(NullWritable.get(), outputValue);

                }
                recordList.clear();

            }
        }

    }

    public static class Reduce extends Reducer<LongWritable, Text, NullWritable, Text> {

        private Text outputValue = new Text();

        @SuppressWarnings("rawtypes")
        private MultipleOutputs multipleOutputs;

        private int fileCounter = 1;

        private List<String> recordList = new ArrayList<String>();

        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Override
        protected void setup(Reducer<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            multipleOutputs = new MultipleOutputs(context);
        }

        @SuppressWarnings("unchecked")
        public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            for (Text value : values) {

                String line = value.toString();

                recordList.add(line);

                if (recordList.size() == 1000) {

                    for (int i = 0; i < recordList.size(); i++) {

                        outputValue.set(recordList.get(i));

                        multipleOutputs.write("mos", NullWritable.get(), outputValue, "output-" + fileCounter);

                    }
                    fileCounter++;
                    recordList.clear();
                }

                if (!recordList.isEmpty()) {

                    for (int i = 0; i < recordList.size(); i++) {

                        outputValue.set(recordList.get(i));

                        context.write(NullWritable.get(), outputValue);

                    }
                }
            }

        }

        @Override
        protected void cleanup(Reducer<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            super.cleanup(context);
            multipleOutputs.close();
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "DataSplitter");
        job.setJarByClass(DataSplitter.class);

        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileSystem.get(conf).delete(new Path(args[1]), true);

        MultipleOutputs.addNamedOutput(job, "mos", TextOutputFormat.class, NullWritable.class, Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) == true ? 0 : 1);
    }

}

这篇关于如何以高效的方式将大数据集划分为Hadoop中的多个小文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆