Hadoop Mapreduce作业卡住地图100%减少51% [英] Hadoop Mapreduce job stuck at map 100% reduce 51%

查看:108
本文介绍了Hadoop Mapreduce作业卡住地图100%减少51%的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以,我正在寻找一个无限循环的地方,我不知道是否还有其他任何可能导致这种情况的东西。我正在使用四个群集节点,所以我非常肯定不会缺少RAM,正如其他相同类型的问题所暗示的那样。



我的代码:

  package org.myorg; 

import java.io.IOException;
import java.util。*;

导入org.apache.hadoop.fs.Path;
导入org.apache.hadoop.conf。*;
import org.apache.hadoop.io。*;
import org.apache.hadoop.mapreduce。*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

导入util.hashing。*;



公共类LatLong {


公共静态类Map扩展Mapper< Object,Text,Text,Text> {
// private final static IntWritable one = new IntWritable(1);

$ b $ public void map(Object key,Text value,Context context)throws IOException,InterruptedException {
String line = value.toString();
String [] longLatArray = line.split(,);
double longi = Double.parseDouble(longLatArray [0]);
double lat = Double.parseDouble(longLatArray [1]);
//列表< Double> origLatLong = new ArrayList< Double>(2);
//origLatLong.add(lat);
//origLatLong.add(longi);
Geohash inst = Geohash.getInstance();
// encode是库的编码函数
String hash = inst.encode(lat,longi);
//使用前5个字符仅用于测试目的
//稍后需要找到正确的一个
int accuracy = 4;
//将事物的哈希值缩短为任何我想出的结果
//成为每个瓦片的正确大小
Text shortenedHash = new Text(hash.substring(0,accuracy)) ;
文本origHash =新文本(散列);

context.write(shortenedHash,origHash);
}
}

public static class Reduce extends Reducer< Text,Text,Text,Text> {

private IntWritable totalTileElementCount = new IntWritable();
私人文本latlongimag = new Text();
private text dataSeparator = new Text();
$ b @Override
public void reduce(Text key,Iterable< Text> values,Context context)throws IOException,InterruptedException {
int elementCount = 0;
boolean first = true;
迭代器<文本> it = values.iterator();
String lat = new String();
String longi = new String();
Geohash inst = Geohash.getInstance(); (it.hasNext()){
elementCount = elementCount + 1;

;
if(first)
{
double [] doubleArray =(inst.decode(it.next()。toString()));
lat = Double.toString(doubleArray [0]);
longi = Double.toString(doubleArray [1]);
first = false;

}



}
totalTileElementCount.set(elementCount);
// Geohash inst = Geohash.getInstance();

String mag = totalTileElementCount.toString();

latlongimag.set(lat +,+ longi +,+ mag +,);
dataSeparator.set();
context.write(latlongimag,dataSeparator);



public static void main(String [] args)throws Exception {
Configuration conf = new Configuration();
工作职位=新职位(conf,wordcount);
job.setJarByClass(LatLong.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job,new Path(args [0]));
FileOutputFormat.setOutputPath(job,new Path(args [1]));

job.waitForCompletion(true);
}

}


解决方案

(it.hasNext()){
elementCount = elementCount + 1; $ {

  
if(first)
{
double [] doubleArray =(inst.decode(it.next()。toString()));
lat = Double.toString(doubleArray [0]);
longi = Double.toString(doubleArray [1]);
first = false;


您将设置为first = false ; 如此在 while(it.hasNext())循环迭代 if(first)没有被输入,并且 it.next()永远不会再被调用,所以如果它有多个元素 it.hasNext()将会返回true,您将永远不会退出循环。


So, I am looking for an infinite loop somewhere, and I don't know if there is anything else that can cause this. I am using four cluster nodes, so I am pretty sure that there cannot be a lack of RAM, as has been suggested in other questions of the same kind.

My code:

package org.myorg;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import util.hashing.*;



public class LatLong {


 public static class Map extends Mapper<Object, Text, Text, Text> {
    //private final static IntWritable one = new IntWritable(1);


    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] longLatArray = line.split(",");
        double longi = Double.parseDouble(longLatArray[0]);
        double lat = Double.parseDouble(longLatArray[1]);
        //List<Double> origLatLong = new ArrayList<Double>(2);
        //origLatLong.add(lat);
        //origLatLong.add(longi);
        Geohash inst = Geohash.getInstance();
        //encode is the library's encoding function
        String hash = inst.encode(lat,longi);
        //Using the first 5 characters just for testing purposes
        //Need to find the right one later
        int accuracy = 4;
        //hash of the thing is shortened to whatever I figure out
        //to be the right size of each tile
        Text shortenedHash = new Text(hash.substring(0,accuracy));
        Text origHash = new Text(hash);

        context.write(shortenedHash, origHash);
    }
 } 

 public static class Reduce extends Reducer<Text, Text, Text, Text> {

     private IntWritable totalTileElementCount = new IntWritable();
     private Text latlongimag = new Text();
     private Text dataSeparator = new Text();

     @Override
     public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      int elementCount = 0;
      boolean first = true;
      Iterator<Text> it = values.iterator();
      String lat = new String();
      String longi = new String();
      Geohash inst = Geohash.getInstance();

      while (it.hasNext()) {
       elementCount = elementCount+1;
       if(first)
       {
           double[] doubleArray = (inst.decode(it.next().toString()));
           lat = Double.toString(doubleArray[0]);
           longi = Double.toString(doubleArray[1]);
           first = false;

       }



      }
      totalTileElementCount.set(elementCount);
      //Geohash inst = Geohash.getInstance();

      String mag = totalTileElementCount.toString();

      latlongimag.set(lat+","+ longi +","+mag+",");
      dataSeparator.set("");
      context.write(latlongimag, dataSeparator );
     }
 }

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = new Job(conf, "wordcount");
    job.setJarByClass(LatLong.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
 }

}       

解决方案

Inside

while (it.hasNext()) {
       elementCount = elementCount+1;
       if(first)
       {
           double[] doubleArray = (inst.decode(it.next().toString()));
           lat = Double.toString(doubleArray[0]);
           longi = Double.toString(doubleArray[1]);
           first = false;
       }
  }

You set first = false; so in next while (it.hasNext()) loop iteration the if(first) is not entered and it.next() is never called again, so if it has more than one element it.hasNext() will allways return true and you will never leave this while loop.

这篇关于Hadoop Mapreduce作业卡住地图100%减少51%的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆