在为大数据集运行mapreduce代码时获取java堆空间错误 [英] Getting java heap space error while running a mapreduce code for large dataset

查看:115
本文介绍了在为大数据集运行mapreduce代码时获取java堆空间错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是MapReduce编程的初学者,并对包含1个NameNode和3个DatanNodes的Hadoop集群中运行的以下Java程序进行了编码:

 包装试用; 

import java.io.IOException;
import java.util。*;
import java.lang.Iterable;

导入org.apache.hadoop.fs.Path;
导入org.apache.hadoop.conf。*;
import org.apache.hadoop.io。*;
import org.apache.hadoop.mapred。*;
import org.apache.hadoop.util。*;


public class Trial
{

public static class MapA extends MapReduceBase implements Mapper< LongWritable,Text,Text,Text>
{

public void map(LongWritable key,Text value,OutputCollector< Text,Text> output,Reporter reporter)throws IOException
{
String [] rows = value.toString()分割( \r \\\
?)。
for(int i = 0; i< rows.length; i ++)
{
String [] cols = rows [i] .toString()。split(,);

String v = cols [0];
for(int j = 1; j {
String k = j +,+ cols [j];
output.collect(new Text(k),new Text(v));







$ b public static class ReduceA extends MapReduceBase implements Reducer< Text ,文字,文字,文字>
{

public void reduce(Text key,Iterator< Text> values,OutputCollector< Text,Text> output,Reporter reporter)throws IOException
{
int count = 0;
String [] attr = key.toString()。split(,);
列表< String> list = new ArrayList< String>(); $(b)
$ b while(values.hasNext())
{
list.add((values.next())。toString());
count ++;

}

String v = Integer.toString(count);
for(String s:list)
{
output.collect(new Text(s),new Text(v));








public static void main(String [ ] args)抛出IOException
{
JobConf conf1 = new JobConf(Trial.class);
conf1.setJobName(Trial);

conf1.setOutputKeyClass(Text.class);
conf1.setOutputValueClass(Text.class);

conf1.setMapperClass(MapA.class);
//conf.setCombinerClass(Combine.class);
conf1.setReducerClass(ReduceA.class);

conf1.setInputFormat(TextInputFormat.class);
conf1.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf1,new Path(args [0]));
FileOutputFormat.setOutputPath(conf1,new Path(args [1]));

JobClient.runJob(conf1);

JobConf conf2 =新的JobConf(Final.class);
conf2.setJobName(Final);

conf2.setOutputKeyClass(Text.class);
conf2.setOutputValueClass(Text.class);

conf2.setMapperClass(Final.MapB.class);
//conf.setCombinerClass(Combine.class);
conf2.setReducerClass(Final.ReduceB.class);

conf2.setInputFormat(TextInputFormat.class);
conf2.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf2,new Path(args [1]));
FileOutputFormat.setOutputPath(conf2,new Path(args [2]));

JobClient.runJob(conf2);


}


}

类最后
{

公开静态类MapB扩展MapReduceBase实现Mapper< LongWritable,Text,Text,Text>
{
$ b $ public void map(LongWritable key,Text value,OutputCollector< Text,Text> output,Reporter reporter)throws IOException
{
String [] r = value.toString()分割( \r \\\
?)。
String [] p1 = new String [5];

for(int i = 0; i {
p1 = r [i] .split(\t);
output.collect(new Text(p1 [0]),new Text(p1 [1]));




$ b public static class ReduceB extends MapReduceBase implements Reducer< Text,Text,Text,Text>
{

@Override
public void reduce(Text key,Iterator< Text> values,OutputCollector< Text,Text> output,Reporter reporter)throws IOException
{
int sum = 0;
while(values.hasNext())
{
String s =(values.next())。toString();
int c = Integer.parseInt(s);
sum + = c;
}
float avf =(float)sum / 3;
String count = Float.toString(avf);
output.collect(key,new Text(count));
}

}

}

这个程序在像这样的数据集上运行:

  ID1,1,2,3 
ID1,1 ,3,2
ID3,2,2,3

每行都有一个ID由3个逗号分隔的属性。我的问题是找到每个ID的每个属性(如果数据集被看作是一个二维数组)的每个属性的值的频率(然后总结每个属性的频率),然后查找数据集:

  ID1:2 + 2 + 2/3 = 2 
ID2 :2 + 1 + 1/3 = 1.33
ID3:1 + 2 + 2/3 = 1.67

以上代码适用于200-500MB等小数据集。但对于大于1GB的数据集,我收到了这样的错误:

  map 100%减少50%
14/04 / 12 12:33:06信息mapred.JobClient:任务ID:attempt_201404121146_0002_r_000001_0,状态:FAILED
错误:Java堆空间
attempt_201404121146_0002_r_000001_0:线程LeaseRenewer:hdfs @ NameNode:8020中的异常java.lang .OutOfMemoryError:Java堆空间
attempt_201404121146_0002_r_000001_0:在org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:397)
attempt_201404121146_0002_r_000001_0:at org.apache.hadoop.hdfs.LeaseRenewer.run( LeaseRenewer.java:436)
attempt_201404121146_0002_r_000001_0:在org.apache.hadoop.hdfs.LeaseRenewer.access $ 700(LeaseRenewer.java:70)
attempt_201404121146_0002_r_000001_0:在org.apache.hadoop.hdfs.LeaseRenewer $ 1。运行(LeaseRenewer.java:297)
attempt_201404121146_0002_r_000001_0:在java.lang.Thread.run(Thread.java:662)
尝试pt_201404121146_0002_r_000001_0:线程中的异常Thread for syncLogsjava.lang.OutOfMemoryError:Java堆空间
attempt_201404121146_0002_r_000001_0:位于java.util.AbstractList.iterator(AbstractList.java:273)
attempt_201404121146_0002_r_000001_0:位于org.apache .hadoop.mapred.TaskLog.syncLogs(TaskLog.java:363)
attempt_201404121146_0002_r_000001_0:at org.apache.hadoop.mapred.Child $ 3.run(Child.java:158)
14/04/12 12:33:10信息mapred.JobClient:map 100%reduce 33%
14/04/12 12:33:12信息mapred.JobClient:任务ID:attempt_201404121146_0002_r_000003_0,状态:FAILED
错误:Java堆空间
attempt_201404121146_0002_r_000003_0:log4j:WARN记录器没有找到appender(org.apache.hadoop.mapred.Task)。
attempt_201404121146_0002_r_000003_0:log4j:WARN请正确初始化log4j系统。
attempt_201404121146_0002_r_000003_0:log4j:WARN有关更多信息,请参见http://logging.apache.org/log4j/1.2/faq.html#noconfig。
14/04/12 12:33:15信息mapred.JobClient:地图100%减少16%
14/04/12 12:33:16信息mapred.JobClient:地图100%减少18%
14/04/12 12:33:16信息mapred.JobClient:任务ID:attempt_201404121146_0002_r_000000_0,状态:FAILED
错误:Java堆空间
attempt_201404121146_0002_r_000000_0:线程异常LeaseRenewer:hdfs @的NameNode:8020\" java.lang.OutOfMemoryError:Java堆空间
attempt_201404121146_0002_r_000000_0:在java.lang.StringCoding.set(StringCoding.java:53)
attempt_201404121146_0002_r_000000_0:在java.lang.StringCoding.decode(StringCoding .java:171)
attempt_201404121146_0002_r_000000_0:在java.lang.String。< init>(String.java:443)
attempt_201404121146_0002_r_000000_0:在java.util.jar.Attributes.read(Attributes.java: 401)
attempt_201404121146_0002_r_000000_0:在java.util.jar.Manifest.read(Manifest.java:182)
attempt_201404121146_0002_r 。_000000_0:在java.util.jar.Manifest中< INIT>(Manifest.java:52)
attempt_201404121146_0002_r_000000_0:在java.util.jar.JarFile.getManifestFromReference(JarFile.java:167)
attempt_201404121146_0002_r_000000_0 :在java.util.jar.JarFile.getManifest(JarFile.java:148)
attempt_201404121146_0002_r_000000_0:在sun.misc.URLClassPath $ JarLoader $ 2.getManifest(URLClassPath.java:696)
attempt_201404121146_0002_r_000000_0:是java .net.URLClassLoader.defineClass(URLClassLoader.java:228)
attempt_201404121146_0002_r_000000_0:在java.net.URLClassLoader.access $ 000(URLClassLoader.java:58)
attempt_201404121146_0002_r_000000_0:在java.net.URLClassLoader $ 1.run (URLClassLoader.java:197)
attempt_201404121146_0002_r_000000_0:处于java.security.AccessController.doPrivileged(本地方法)
attempt_201404121146_0002_r_000000_0:位于java.net.URLClassLoader.findClass(URLClassLoa der.java:190)
attempt_201404121146_0002_r_000000_0:在java.lang.ClassLoader.loadClass(ClassLoader.java:306)
attempt_201404121146_0002_r_000000_0:在sun.misc.Launcher $ AppClassLoader.loadClass(Launcher.java:301)
attempt_201404121146_0002_r_000000_0:在java.lang.ClassLoader.loadClass(ClassLoader.java:247)
attempt_201404121146_0002_r_000000_0:在org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:400)
试图_201404121146_0002_r_000000_0:在org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:436)
attempt_201404121146_0002_r_000000_0:在org.apache.hadoop.hdfs.LeaseRenewer.access $ 700(LeaseRenewer.java:70)
attempt_201404121146_0002_r_000000_0:在org.apache.hadoop.hdfs.LeaseRenewer $ 1.run(LeaseRenewer.java:297)
attempt_201404121146_0002_r_000000_0:在java.lang.Thread.run(Thread.java:662)
14分配/ 04/12 12:33:21信息mapred.JobClient:map 100%减少20%

我认为我的程序消耗了太多内存,需要优化。我甚至试图通过将我的Java堆空间增加到1024MB来解决这个问题,但我仍然遇到了同样的错误。我使用的数据集是1.4GB,其中有5行,9个属性排在行ID之外。由于我的问题是大数据,因此使用小数据测试代码不是解决方案。你可以建议我如何优化我的代码,以便解决内存问题。提前致谢。

解决方案

由于遍历迭代器两次的选项是不可能的,您的堆不能处理存储在列表中的大量值,我建议你添加一个中介 MapReduce 步骤,为你的工作提供总共三个 MapReduce 步骤。

我的建议如下:


  • 第1步 />
    Mapper 1 输出 attributeID +,+ value => UserID

    Reducer 1 计算每个键的总数( attributeID +,+ value )。首先,它输出 attributeID +,+ value => UserID Mapper 1 收到。其次,它输出。 + attributeID +,+ value => TOTAL_COUNT 。点被添加为前缀以确保所有 total_counts 首先到达下一个 Reducer

  • 第二步
    映射器2
    / i>除了输出它接收到的每一个输入外什么都没有做。
    Reducer 2 保证首先收到 total_counts 。所以只要它是一个对应于 total_count 的行,它将它存储在一个 HashMap attributeID +, + value => total_count )。因此,只要它开始接收其他行,它所要做的就是从 HashMap 中检索相应的 total_count ,并输出 UserID =>

    请注意,在这个阶段只能使用一个Reducer ,所以你必须设置 mapreduce.job .reduces 1 。您可以在此步骤之后将它重置为原先的值。

  • 第三步
    和第二个 MapReduce 进入你的初始解决方案。计算平均值和输出 UserID =>平均




这个解决方案非常乐观,因为它假定您的堆可以处理你的 HashMap 。试试看看会发生什么。



以下是一个示例代码:

  public class Trial {

public static class MapA extends MapReduceBase implements Mapper< LongWritable,Text,Text,Text>
{

public void map(LongWritable key,Text value,OutputCollector< Text,Text> output,Reporter reporter)throws IOException
{
String [] rows = value.toString()分割( \r \\\
?)。
for(int i = 0; i< rows.length; i ++){
String [] cols = rows [i] .toString()。split(,);

String v = cols [0];
for(int j = 1; j String k = j +,+ cols [j];
output.collect(new Text(k),new Text(v));





$ b public static class ReduceA extends MapReduceBase implements Reducer< Text,Text,Text,Text>
{
$ b $ public void reduce(Text key,Iterator< Text> values,
OutputCollector< Text,Text> output,Reporter reporter)
throws IOException {

int count = 0;

while(values.hasNext()){
output.collect(key,values.next());
count ++;
}
output.collect(new Text(。+ key),
new Text(count));
}

}


public static class MapB extends MapReduceBase implements Mapper< Text,Text,Text,Text>
{
$ b $ public void map(Text key,Text value,OutputCollector< Text,Text> output,Reporter reporter)throws IOException
{
output.collect(key ,价值);




public static class ReduceB extends MapReduceBase implements Reducer< Text,Text,Text,Text>
{

私人地图< String,Integer> total_count = new HashMap< String,Integer>();
私人套餐< String> attributes = new HashSet< String>(); //计算不同数量的属性
$ b public void reduce(Text key,Iterator< Text> values,
OutputCollector< Text,IntWritable>输出,Reporter记者)
throws IOException {

字符串rKey = key.toString();
if(rKey.startsWith(。)){
while(values.hasNext()){
total_count.put(rKey.substring(1),Integer.valueOf(values。 。下一个()的toString()));
attributes.add(rKey.substring(1).split(,)[0]);
return;
}
}
while(values.hasNext()){
Text value = values.next();
output.collect(value,new Text(Integer.toString(total_count.get(rKey))));
output.collect(value,new Text(。+ attributes.size())); //发送属性总数




$ b public static class MapC extends MapReduceBase implements Mapper< Text,Text,Text,文本和GT;
{
$ b $ public void map(Text key,Text value,OutputCollector< Text,Text> output,Reporter reporter)throws IOException
{
output.collect(key ,价值);



public static class ReduceC extends MapReduceBase implements Reducer< Text,Text,Text,DoubleWritable>
{

@Override
public void reduce(Text key,Iterator< Text> values,OutputCollector< Text,DoubleWritable> output,Reporter reporter)throws IOException
{
long sum = 0;
int nbAttributes = 0;
while(values.hasNext()){
String value = values.next();
if(value.startsWith(。)){//检查行是否对应于属性的总数
nbAttributes = Integer.parseInt(value.substring(1));
} else {
sum + = Integer.parseInt(value);


output.collect(key,new DoubleWritable(sum / nbAttributes));
}
}

}


I am a beginner of MapReduce programming and have coded the following Java program for running in a Hadoop cluster comprising 1 NameNode and 3 DatanNodes :

package trial;

import java.io.IOException;
import java.util.*;
import java.lang.Iterable;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;


public class Trial 
{

public static class MapA extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> 
{

    public void map(LongWritable key, Text value, OutputCollector<Text,Text> output, Reporter reporter) throws IOException
    {
       String[] rows = value.toString().split("\r?\n");          
       for(int i=0;i<rows.length;i++)
       {
           String[] cols = rows[i].toString().split(",");

           String v=cols[0];
           for(int j=1;j<cols.length;j++)
           {
               String k =j+","+cols[j];
               output.collect(new Text(k),new Text(v));
           }
       }


   }
}


public static class ReduceA extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{

        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>output, Reporter reporter) throws IOException 
        {
            int count =0;                
            String[] attr = key.toString().split(",");      
            List<String> list = new ArrayList<String>();

           while(values.hasNext())               
            {
                list.add((values.next()).toString());
                count++;

            }

           String v=Integer.toString(count);
           for(String s:list)
           { 
               output.collect(new Text(s),new Text(v));
           }

        }   

}




public static void main(String[] args) throws IOException
{
    JobConf conf1 = new JobConf(Trial.class);
    conf1.setJobName("Trial");

    conf1.setOutputKeyClass(Text.class);
    conf1.setOutputValueClass(Text.class);

    conf1.setMapperClass(MapA.class);
    //conf.setCombinerClass(Combine.class);
    conf1.setReducerClass(ReduceA.class);

    conf1.setInputFormat(TextInputFormat.class);
    conf1.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf1, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf1, new Path(args[1]));

    JobClient.runJob(conf1);

    JobConf conf2 = new JobConf(Final.class);
    conf2.setJobName("Final");

    conf2.setOutputKeyClass(Text.class);
    conf2.setOutputValueClass(Text.class);

    conf2.setMapperClass(Final.MapB.class);
    //conf.setCombinerClass(Combine.class);
    conf2.setReducerClass(Final.ReduceB.class);

    conf2.setInputFormat(TextInputFormat.class);
    conf2.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf2, new Path(args[1]));
    FileOutputFormat.setOutputPath(conf2, new Path(args[2]));

    JobClient.runJob(conf2);


  }


  }  

class Final
{

public static class MapB extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> 
{

    public void map(LongWritable key, Text value, OutputCollector<Text,Text> output, Reporter reporter) throws IOException
    {
       String[] r = value.toString().split("\r?\n");
       String[] p1= new String[5];

       for(int i=0;i<r.length;i++)
       {
           p1 = r[i].split("\t");               
           output.collect(new Text(p1[0]),new Text(p1[1]));
       }

   }
}

 public static class ReduceB extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{

        @Override
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>output, Reporter reporter) throws IOException 
        {
           int sum=0;
           while(values.hasNext())
           {
               String s = (values.next()).toString();
               int c=Integer.parseInt(s);
               sum+=c;
           }
           float avf =(float)sum/3;
           String count=Float.toString(avf);
           output.collect(key,new Text(count));
        }   

}

}

The program is run on a dataset like this:

ID1,1,2,3 
ID1,1,3,2
ID3,2,2,3

Each row has an ID followed by 3 comma-separated attributes. My problem is to find the frequency of the value of each attribute(along the column not across the row if the dataset is seen as a 2-D array) of each ID and then sum up the frequencies of each attribute for an ID and find the average.Thus for the above the dataset:

ID1 : 2+2+2/3=2
ID2 : 2+1+1/3=1.33
ID3 : 1+2+2/3=1.67

The above code is working well with small datasets like 200-500MB. But for datasets above 1GB I am getting an error like this:

 map 100% reduce 50%
       14/04/12 12:33:06 INFO mapred.JobClient: Task Id :  attempt_201404121146_0002_r_000001_0, Status : FAILED
      Error: Java heap space
      attempt_201404121146_0002_r_000001_0: Exception in thread  "LeaseRenewer:hdfs@NameNode:8020" java.lang.OutOfMemoryError: Java heap space
      attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:397)
     attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:436)
      attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:70)
     attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:297)
     attempt_201404121146_0002_r_000001_0:     at java.lang.Thread.run(Thread.java:662)
     attempt_201404121146_0002_r_000001_0: Exception in thread "Thread for syncLogs" java.lang.OutOfMemoryError: Java heap space
     attempt_201404121146_0002_r_000001_0:     at java.util.AbstractList.iterator(AbstractList.java:273)
     attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.mapred.TaskLog.syncLogs(TaskLog.java:363)
     attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.mapred.Child$3.run(Child.java:158)
     14/04/12 12:33:10 INFO mapred.JobClient:  map 100% reduce 33%
     14/04/12 12:33:12 INFO mapred.JobClient: Task Id :    attempt_201404121146_0002_r_000003_0, Status : FAILED
     Error: Java heap space
      attempt_201404121146_0002_r_000003_0: log4j:WARN No appenders could be found for logger (org.apache.hadoop.mapred.Task).
     attempt_201404121146_0002_r_000003_0: log4j:WARN Please initialize the log4j system properly.
      attempt_201404121146_0002_r_000003_0: log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
     14/04/12 12:33:15 INFO mapred.JobClient:  map 100% reduce 16%
     14/04/12 12:33:16 INFO mapred.JobClient:  map 100% reduce 18%
     14/04/12 12:33:16 INFO mapred.JobClient: Task Id : attempt_201404121146_0002_r_000000_0, Status : FAILED
     Error: Java heap space
      attempt_201404121146_0002_r_000000_0: Exception in thread "LeaseRenewer:hdfs@NameNode:8020" java.lang.OutOfMemoryError: Java heap space
     attempt_201404121146_0002_r_000000_0:     at java.lang.StringCoding.set(StringCoding.java:53)
     attempt_201404121146_0002_r_000000_0:     at java.lang.StringCoding.decode(StringCoding.java:171)
     attempt_201404121146_0002_r_000000_0:     at java.lang.String.<init>(String.java:443)
     attempt_201404121146_0002_r_000000_0:     at java.util.jar.Attributes.read(Attributes.java:401)
      attempt_201404121146_0002_r_000000_0:     at java.util.jar.Manifest.read(Manifest.java:182)
      attempt_201404121146_0002_r_000000_0:     at java.util.jar.Manifest.<init>(Manifest.java:52)
       attempt_201404121146_0002_r_000000_0:     at java.util.jar.JarFile.getManifestFromReference(JarFile.java:167)
       attempt_201404121146_0002_r_000000_0:     at java.util.jar.JarFile.getManifest(JarFile.java:148)
       attempt_201404121146_0002_r_000000_0:     at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:696)
       attempt_201404121146_0002_r_000000_0:     at java.net.URLClassLoader.defineClass(URLClassLoader.java:228)
        attempt_201404121146_0002_r_000000_0:     at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
        attempt_201404121146_0002_r_000000_0:     at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
       attempt_201404121146_0002_r_000000_0:     at      java.security.AccessController.doPrivileged(Native Method)
       attempt_201404121146_0002_r_000000_0:     at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
     attempt_201404121146_0002_r_000000_0:     at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
     attempt_201404121146_0002_r_000000_0:     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
   attempt_201404121146_0002_r_000000_0:     at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
   attempt_201404121146_0002_r_000000_0:     at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:400)
   attempt_201404121146_0002_r_000000_0:     at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:436)
  attempt_201404121146_0002_r_000000_0:     at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:70)
  attempt_201404121146_0002_r_000000_0:     at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:297)
  attempt_201404121146_0002_r_000000_0:     at java.lang.Thread.run(Thread.java:662)
 14/04/12 12:33:21 INFO mapred.JobClient:  map 100% reduce 20%

I think my program is consuming too much memory and need to be optimized. I even tried to solve this by increasing my java heap space to 1024MB but still I am getting the same error. The dataset I had used was 1.4GB which had 5cr rows with 9 attributes exclusing the row ID. Since my problem is of Big data , testing the code with small data is not a solution. Plz can you suggest me how do i optimise my code so that the memory issue is resolved. Thanks in advance.

解决方案

Since the option of traversing the iterator twice is not possible and your heap cannot handle the large amount of values stored in a list, I suggest you add an intermediary MapReduce step, giving a total of three MapReduce steps for your job.

My proposition is as follows :

  • Step 1
    Mapper 1 outputs attributeID + "," + value => UserID
    Reducer 1 computes the total count for each key (attributeID + "," + value). First, it outputs the attributeID + "," + value => UserID as received from Mapper 1. Secondly, it outputs "." + attributeID + "," + value => total_count. The dot is added as prefix to ensure that all total_counts arrive first to the next Reducer. This is guaranteed thanks to the sort phase.

  • Step 2
    Mapper 2 does nothing other than output every input it receives.
    Reducer 2 is guaranteed to receive the total_counts first. So as long as it's a row that corresponds to a total_count, it stores it in a HashMap (attributeID + "," + value => total_count). So as soon as it starts receiving the other rows, all it has to do is retrieve the corresponding total_count from the HashMap and output UserID => total_count.
    Note that only one Reducer should be used in this phase, so you have to set mapreduce.job.reduces to 1. You can reset it to your former value after this step.

  • Step 3
    Same as the second MapReduce step in your initial solution. Computes the average and outputs UserID => average.

This solution is quite optimistic, as it assumes that your heap can handle your HashMap. Give it a try and see what happens.

Here is a sample code :

public class Trial {

public static class MapA extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> 
{

public void map(LongWritable key, Text value, OutputCollector<Text,Text> output, Reporter reporter) throws IOException
{
        String[] rows = value.toString().split("\r?\n");
        for (int i = 0; i < rows.length; i++) {
            String[] cols = rows[i].toString().split(",");

            String v = cols[0];
            for (int j = 1; j < cols.length; j++) {
                String k = j + "," + cols[j];
                output.collect(new Text(k), new Text(v));
            }
        }
}
}


public static class ReduceA extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{

    public void reduce(Text key, Iterator<Text> values,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException {

        int count = 0;

        while (values.hasNext()) {
            output.collect(key, values.next());
            count++;
        }
        output.collect(new Text("." + key),
                new Text(count));
    }  

}


public static class MapB extends MapReduceBase implements Mapper<Text, Text, Text, Text> 
{

public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
    output.collect(key, value);
}
}


public static class ReduceB extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{

private Map<String, Integer> total_count = new HashMap<String, Integer>();
private Set<String> attributes = new HashSet<String>(); // count the distinct number of attributes

    public void reduce(Text key, Iterator<Text> values,
            OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {

        String rKey = key.toString();
        if(rKey.startsWith(".")){
            while (values.hasNext()) {
                total_count.put(rKey.substring(1), Integer.valueOf(values.next().toString()));
                attributes.add(rKey.substring(1).split(",")[0]);
                return;
            }
        }
        while (values.hasNext()) {
            Text value = values.next();
            output.collect(value, new Text(Integer.toString(total_count.get(rKey))));
            output.collect(value, new Text("." + attributes.size())); // send the total number of attributes
        }
    }  
}


public static class MapC extends MapReduceBase implements Mapper<Text, Text, Text, Text> 
{

public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
        output.collect(key, value);
    }
}

public static class ReduceC extends MapReduceBase implements Reducer<Text, Text, Text, DoubleWritable>
{

    @Override
    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, DoubleWritable>output, Reporter reporter) throws IOException 
    {
       long sum = 0;
       int nbAttributes = 0;
       while(values.hasNext()){
           String value = values.next();
           if(value.startsWith(".")){ // check if line corresponds to the total number of attributes
               nbAttributes = Integer.parseInt(value.substring(1)); 
           } else{
               sum += Integer.parseInt(value);   
           }
       }
       output.collect(key, new DoubleWritable(sum / nbAttributes));
    }   
}

} 

这篇关于在为大数据集运行mapreduce代码时获取java堆空间错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆