Mapreduce工作使用多线程 [英] Is Mapreduce job using multiplethreading

查看:512
本文介绍了Mapreduce工作使用多线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我很好奇mapreduce作业是否在一台机器上使用多个线程。例如,我在hadoop集群中有10台服务器,默认情况下,如果输入文件足够大,则会有10个映射器。是单个映射器在单个机器中使用多个线程吗?

解决方案


线程在一台机器上?

是。 Mapreduce作业可以使用多线程映射器(多线程或线程池运行 map 方法)。




  • 我曾经为了更好的CPU使用率而使用 仅映射Hbase作业 ...



    MultiThreadedMapper 非常适合,如果您的操作高度占用CPU,可能会提高速度。


    映射类应该扩展 org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper 代替常规 org.apache.hadoop.mapreduce.Mapper


    Multithreadedmapper 具有run()
    方法的不同实现。如下所示。

      run(org.apache.hadoop.mapreduce.Mapper.Context context)

    $ b

    使用线程池运行应用程序的映射。


    您可以在 MultiThreadedMapper 中设置映射器中的线程数量

    > MultithreadedMapper.setNumberOfThreads(n); 或者您可以通过属性文件 mapred.map.multithreadedrunner.threads = n
    并使用上面的setter方法(每个工作基础)来控制cpu密集程度较低的作业。



    这样做的影响,你可以在mapreduce计数器中看到与CPU相关的计数器。

    MultithreadedMapper实现的示例代码片段

      import org.apache.hadoop.fs.Path; 
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

    import java.io.IOException;
    import java.util.regex.Pattern;


    public class MultithreadedWordCount {

    //类应该是线程安全的
    public static class WordCountMapper扩展了Mapper< LongWritable,Text,Text,LongWritable> {
    public static enum PREPOST {SETUP,CLEANUP}

    @Override()
    保护无效设置(Mapper< LongWritable,Text,Text,LongWritable> .Context上下文)throws java .io.IOException,java.lang.InterruptedException {
    //将被多次调用
    context.getCounter(PREPOST.SETUP).increment(1);

    $ b @Override
    protected void map(LongWritable key,Text value,
    Context context)throws IOException,InterruptedException {

    String [] words = value.toString()。toLowerCase()。split([\\p {Blank} [\\\p {Punct}]] +);
    for(String word:words){
    context.write(new Text(word),new LongWritable(1)); (Mapper< LongWritable,Text,Text,LongWritable> .Context上下文)抛出java.io.IOException();抛出java.io.IOException异常,InterruptedException {
    //将被多次调用
    context.getCounter(PREPOST.CLEANUP).increment(1);



    public static class WordCountReducer extends Reducer< Text,LongWritable,Text,LongWritable> {
    @Override
    protected void reduce(Text key,Iterable< LongWritable> values,Context context
    )throws IOException,InterruptedException {
    long sum = 0; (LongWritable value:values)

    sum + = value.get();
    }
    context.write(key,new LongWritable(sum));



    public static void main(String [] args)throws IOException,ClassNotFoundException,InterruptedException {
    Job job = new Job();
    job.setJarByClass(WordCount.class);

    FileInputFormat.addInputPath(job,new Path(args [0]));
    FileOutputFormat.setOutputPath(job,new Path(args [1]));

    MultithreadedMapper.setMapperClass(job,MultithreadedWordCount.WordCountMapper.class);
    MultithreadedMapper.setNumberOfThreads(job,10);

    job.setMapperClass(MultithreadedMapper.class);
    job.setCombinerClass(MultithreadedWordCount.WordCountReducer.class);
    job.setReducerClass(MultithreadedWordCount.WordCountReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);

    / * begin defaults * /
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    / *结束默认值* /

    job.waitForCompletion(true);
    }
    }


    I am curious about whether mapreduce job is using multiple threading in a single machine. For example, I have 10 servers in the hadoop cluster, by default, if the input file is large enough, there will be 10 mappers. Is the single mapper using multiple threading in a single machine?

    解决方案

    Is the single mapper using multiple threading in a single machine?

    YES. Mapreduce job can use multithreaded mapper(Multiple threads or thread pool running map method) .

    • I have used for better CPU utilization for Map only Hbase jobs...

      MultiThreadedMapper is a good fit if your operation is highly CPU intensive, could increase the speed.

    mapper class should extend org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper instead of regular org.apache.hadoop.mapreduce.Mapper .

    The Multithreadedmapper has a different implementation of run() method. like below.

    run(org.apache.hadoop.mapreduce.Mapper.Context context)
    

    Run the application's maps using a thread pool.

    You can set the number of threads within a mapper in MultiThreadedMapper by

    MultithreadedMapper.setNumberOfThreads(n); or you can set the property in loading from a property file mapred.map.multithreadedrunner.threads = n and use above setter method(per job basis) to control jobs which are less cpu intensive.

    The affect of doing this, you can see in mapreduce counters specially CPU related counters.

    Example Code snippet of MultithreadedMapper implementation:

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    import java.io.IOException;
    import java.util.regex.Pattern;
    
    
    public class MultithreadedWordCount {
    
        // class should be thread safe
        public static class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
            public static enum PREPOST { SETUP, CLEANUP }
    
            @Override()
            protected void setup(Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, java.lang.InterruptedException {
                // will be called several times
                context.getCounter(PREPOST.SETUP).increment(1);
            }
    
            @Override
            protected void map(LongWritable key, Text value,
                         Context context) throws IOException, InterruptedException {
    
                String[] words = value.toString().toLowerCase().split("[\\p{Blank}[\\p{Punct}]]+");
                for (String word : words) {
                    context.write(new Text(word), new LongWritable(1));
                }
            }
    
            @Override()
            protected void cleanup(Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException {
                // will be called several times
                context.getCounter(PREPOST.CLEANUP).increment(1);
            }
        }
    
        public static class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
            @Override
            protected void reduce(Text key, Iterable<LongWritable> values, Context context
                            ) throws IOException, InterruptedException {
                long sum = 0;
                for (LongWritable value: values) {
                  sum += value.get();
                }
                context.write(key, new LongWritable(sum));
            }
        }
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Job job = new Job();
            job.setJarByClass(WordCount.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            MultithreadedMapper.setMapperClass(job, MultithreadedWordCount.WordCountMapper.class);
            MultithreadedMapper.setNumberOfThreads(job, 10);
    
            job.setMapperClass(MultithreadedMapper.class);
            job.setCombinerClass(MultithreadedWordCount.WordCountReducer.class);
            job.setReducerClass(MultithreadedWordCount.WordCountReducer.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
    
            /* begin defaults */
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            /* end defaults */
    
            job.waitForCompletion(true);
        }
    }
    

    这篇关于Mapreduce工作使用多线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆