Mapreduce作业-花费太长时间才能完成 [英] Mapreduce Job -Taking too long to complete

查看:106
本文介绍了Mapreduce作业-花费太长时间才能完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们已经编写了mapreduce作业来处理日志文件.到目前为止,我们有大约52GB的输入文件,但是处理数据大约需要一个小时.默认情况下,它仅创建一个reducer作业.通常,我们会在reduce任务中看到超时错误,然后重新启动并完成.以下是成功完成工作的统计信息.请让我们知道如何提高性能.

File System Counters
            FILE: Number of bytes read=876100387
            FILE: Number of bytes written=1767603407
            FILE: Number of read operations=0
            FILE: Number of large read operations=0
            FILE: Number of write operations=0
            HDFS: Number of bytes read=52222279591
            HDFS: Number of bytes written=707429882
            HDFS: Number of read operations=351
            HDFS: Number of large read operations=0
            HDFS: Number of write operations=2
    Job Counters 
            Failed reduce tasks=1
            Launched map tasks=116
            Launched reduce tasks=2
            Other local map tasks=116
            Total time spent by all maps in occupied slots (ms)=9118125
            Total time spent by all reduces in occupied slots (ms)=7083783
            Total time spent by all map tasks (ms)=3039375
            Total time spent by all reduce tasks (ms)=2361261
            Total vcore-seconds taken by all map tasks=3039375
            Total vcore-seconds taken by all reduce tasks=2361261
            Total megabyte-seconds taken by all map tasks=25676640000
            Total megabyte-seconds taken by all reduce tasks=20552415744
    Map-Reduce Framework
            Map input records=49452982
            Map output records=5730971
            Map output bytes=864140911
            Map output materialized bytes=876101077
            Input split bytes=13922
            Combine input records=0
            Combine output records=0
            Reduce input groups=1082133
            Reduce shuffle bytes=876101077
            Reduce input records=5730971
            Reduce output records=5730971
            Spilled Records=11461942
            Shuffled Maps =116
            Failed Shuffles=0
            Merged Map outputs=116
            GC time elapsed (ms)=190633
            CPU time spent (ms)=4536110
            Physical memory (bytes) snapshot=340458307584
            Virtual memory (bytes) snapshot=1082745069568
            Total committed heap usage (bytes)=378565820416
    Shuffle Errors
            BAD_ID=0
            CONNECTION=0
            IO_ERROR=0
            WRONG_LENGTH=0
            WRONG_MAP=0
            WRONG_REDUCE=0
    File Input Format Counters 
            Bytes Read=52222265669
    File Output Format Counters 
            Bytes Written=707429882

如果增加减速器的数量,我将收到如下所示的classcast异常.我猜问题出在分区器类中.

java.lang.Exception: java.lang.ClassCastException: com.emaar.bigdata.exchg.logs.CompositeWritable cannot be cast to org.apache.hadoop.io.Text
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.ClassCastException: com.emaar.bigdata.exchg.logs.CompositeWritable cannot be cast to org.apache.hadoop.io.Text
    at com.emaar.bigdata.exchg.logs.ActualKeyPartitioner.getPartition(ActualKeyPartitioner.java:1)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:716)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
    at com.emaar.bigdata.exchg.logs.ExchgLogsMapper.map(ExchgLogsMapper.java:56)
    at com.emaar.bigdata.exchg.logs.ExchgLogsMapper.map(ExchgLogsMapper.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) 

我的分区程序类

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class ActualKeyPartitioner extends Partitioner<CompositeKey, Text> {

    HashPartitioner<Text, Text> hashPartitioner = new HashPartitioner<Text, Text>();
    Text newKey = new Text();

    @Override
    public int getPartition(CompositeKey key, Text value, int numReduceTasks) {

        try {
            // Execute the default partitioner over the first part of the key
            newKey.set(key.getSubject());
            return hashPartitioner.getPartition(newKey, value, numReduceTasks);
        } catch (Exception e) {
            e.printStackTrace();
            return (int) (Math.random() * numReduceTasks); // this would return
                                                            // a random value in
                                                            // the range
            // [0,numReduceTasks)
        }
    }
}

映射器代码

public class ExchgLogsMapper extends Mapper<LongWritable, List<Text>, CompositeKey, Writable> {
    String recepientAddresses = "";
    public static final String DELIVER = "DELIVER";
    public static final String RESOLVED = "Resolved";
    public static final String JUNK = "Junk E-mail";
    public static final String SEMICOLON = ";";
    public static final String FW1 = "FW: ";
    public static final String FW2 = "Fw: ";
    public static final String FW3 = "FWD: ";
    public static final String FW4 = "Fwd: ";
    public static final String FW5 = "fwd: ";
    public static final String RE1 = "RE: ";
    public static final String RE2 = "Re: ";
    public static final String RE3 = "re: ";


    Text mailType = new Text("NEW");
    Text fwType = new Text("FW");
    Text reType = new Text("RE");
    Text recepientAddr = new Text();

    @Override
    public void map(LongWritable key, List<Text> values, Context context) throws IOException, InterruptedException {
        String subj = null;
        int lstSize=values.size() ;
        if ((lstSize >= 26)) {
            if (values.get(8).toString().equals(DELIVER)) {
                if (!(ExclusionList.exclusions.contains(values.get(18).toString()))) {
                    if (!(JUNK.equals((values.get(12).toString())))) {
                        subj = values.get(17).toString();
                        recepientAddresses = values.get(11).toString();
                        String[] recepientAddressArr = recepientAddresses.split(SEMICOLON);
                        if (subj.startsWith(FW1) || subj.startsWith(FW2) || subj.startsWith(FW3)
                                || subj.startsWith(FW4) || subj.startsWith(FW5)) {
                            mailType = fwType;
                            subj = subj.substring(4);
                        } else if (subj.startsWith(RE1) || subj.startsWith(RE2) || subj.startsWith(RE3)) {
                            mailType = reType;
                            subj = subj.substring(4);
                        }
                        for (int i = 0; i < recepientAddressArr.length; i++) {
                            CompositeKey ckey = new CompositeKey(subj, values.get(0).toString());
                            recepientAddr.set(recepientAddressArr[i]);
                            CompositeWritable out = new CompositeWritable(mailType, recepientAddr, values.get(18),
                                    values.get(0));
                            context.write(ckey, out);
//                          System.err.println(out);

                        }
                    }
                }
            }
        }

解决方案

循环中的reducer代码中几乎没有sysout,它正在写入大量日志,并且在删除它们后,reducer在几分钟之内就完成了!! >

We have written a mapreduce job to process log files. As of now we have around 52GB of input files but it is taking around an hour to process the data.It creates only one reducer job by default.Often we get to see a timeout error in the reduce task and then it restarts and gets completed. Below is the stats for the successful completion of the job. Kindly let us know how the performance can be improved.

File System Counters
            FILE: Number of bytes read=876100387
            FILE: Number of bytes written=1767603407
            FILE: Number of read operations=0
            FILE: Number of large read operations=0
            FILE: Number of write operations=0
            HDFS: Number of bytes read=52222279591
            HDFS: Number of bytes written=707429882
            HDFS: Number of read operations=351
            HDFS: Number of large read operations=0
            HDFS: Number of write operations=2
    Job Counters 
            Failed reduce tasks=1
            Launched map tasks=116
            Launched reduce tasks=2
            Other local map tasks=116
            Total time spent by all maps in occupied slots (ms)=9118125
            Total time spent by all reduces in occupied slots (ms)=7083783
            Total time spent by all map tasks (ms)=3039375
            Total time spent by all reduce tasks (ms)=2361261
            Total vcore-seconds taken by all map tasks=3039375
            Total vcore-seconds taken by all reduce tasks=2361261
            Total megabyte-seconds taken by all map tasks=25676640000
            Total megabyte-seconds taken by all reduce tasks=20552415744
    Map-Reduce Framework
            Map input records=49452982
            Map output records=5730971
            Map output bytes=864140911
            Map output materialized bytes=876101077
            Input split bytes=13922
            Combine input records=0
            Combine output records=0
            Reduce input groups=1082133
            Reduce shuffle bytes=876101077
            Reduce input records=5730971
            Reduce output records=5730971
            Spilled Records=11461942
            Shuffled Maps =116
            Failed Shuffles=0
            Merged Map outputs=116
            GC time elapsed (ms)=190633
            CPU time spent (ms)=4536110
            Physical memory (bytes) snapshot=340458307584
            Virtual memory (bytes) snapshot=1082745069568
            Total committed heap usage (bytes)=378565820416
    Shuffle Errors
            BAD_ID=0
            CONNECTION=0
            IO_ERROR=0
            WRONG_LENGTH=0
            WRONG_MAP=0
            WRONG_REDUCE=0
    File Input Format Counters 
            Bytes Read=52222265669
    File Output Format Counters 
            Bytes Written=707429882

I am getting a classcast exception as below if I increase the number of reducers. I guess the issue comes from the partitioner class.

java.lang.Exception: java.lang.ClassCastException: com.emaar.bigdata.exchg.logs.CompositeWritable cannot be cast to org.apache.hadoop.io.Text
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.ClassCastException: com.emaar.bigdata.exchg.logs.CompositeWritable cannot be cast to org.apache.hadoop.io.Text
    at com.emaar.bigdata.exchg.logs.ActualKeyPartitioner.getPartition(ActualKeyPartitioner.java:1)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:716)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
    at com.emaar.bigdata.exchg.logs.ExchgLogsMapper.map(ExchgLogsMapper.java:56)
    at com.emaar.bigdata.exchg.logs.ExchgLogsMapper.map(ExchgLogsMapper.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) 

My Partitioner class

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class ActualKeyPartitioner extends Partitioner<CompositeKey, Text> {

    HashPartitioner<Text, Text> hashPartitioner = new HashPartitioner<Text, Text>();
    Text newKey = new Text();

    @Override
    public int getPartition(CompositeKey key, Text value, int numReduceTasks) {

        try {
            // Execute the default partitioner over the first part of the key
            newKey.set(key.getSubject());
            return hashPartitioner.getPartition(newKey, value, numReduceTasks);
        } catch (Exception e) {
            e.printStackTrace();
            return (int) (Math.random() * numReduceTasks); // this would return
                                                            // a random value in
                                                            // the range
            // [0,numReduceTasks)
        }
    }
}

Mapper Code

public class ExchgLogsMapper extends Mapper<LongWritable, List<Text>, CompositeKey, Writable> {
    String recepientAddresses = "";
    public static final String DELIVER = "DELIVER";
    public static final String RESOLVED = "Resolved";
    public static final String JUNK = "Junk E-mail";
    public static final String SEMICOLON = ";";
    public static final String FW1 = "FW: ";
    public static final String FW2 = "Fw: ";
    public static final String FW3 = "FWD: ";
    public static final String FW4 = "Fwd: ";
    public static final String FW5 = "fwd: ";
    public static final String RE1 = "RE: ";
    public static final String RE2 = "Re: ";
    public static final String RE3 = "re: ";


    Text mailType = new Text("NEW");
    Text fwType = new Text("FW");
    Text reType = new Text("RE");
    Text recepientAddr = new Text();

    @Override
    public void map(LongWritable key, List<Text> values, Context context) throws IOException, InterruptedException {
        String subj = null;
        int lstSize=values.size() ;
        if ((lstSize >= 26)) {
            if (values.get(8).toString().equals(DELIVER)) {
                if (!(ExclusionList.exclusions.contains(values.get(18).toString()))) {
                    if (!(JUNK.equals((values.get(12).toString())))) {
                        subj = values.get(17).toString();
                        recepientAddresses = values.get(11).toString();
                        String[] recepientAddressArr = recepientAddresses.split(SEMICOLON);
                        if (subj.startsWith(FW1) || subj.startsWith(FW2) || subj.startsWith(FW3)
                                || subj.startsWith(FW4) || subj.startsWith(FW5)) {
                            mailType = fwType;
                            subj = subj.substring(4);
                        } else if (subj.startsWith(RE1) || subj.startsWith(RE2) || subj.startsWith(RE3)) {
                            mailType = reType;
                            subj = subj.substring(4);
                        }
                        for (int i = 0; i < recepientAddressArr.length; i++) {
                            CompositeKey ckey = new CompositeKey(subj, values.get(0).toString());
                            recepientAddr.set(recepientAddressArr[i]);
                            CompositeWritable out = new CompositeWritable(mailType, recepientAddr, values.get(18),
                                    values.get(0));
                            context.write(ckey, out);
//                          System.err.println(out);

                        }
                    }
                }
            }
        }

解决方案

There were few sysouts in the reducer code inside the loop which was writing lots of logs and after removing them the reducer gets finished in couple of minutes.!

这篇关于Mapreduce作业-花费太长时间才能完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆