Hadoop Mapreduce作业卡住地图100%减少51% [英] Hadoop Mapreduce job stuck at map 100% reduce 51%
问题描述
我的代码:
package org.myorg;
import java.io.IOException;
import java.util。*;
导入org.apache.hadoop.fs.Path;
导入org.apache.hadoop.conf。*;
import org.apache.hadoop.io。*;
import org.apache.hadoop.mapreduce。*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
导入util.hashing。*;
公共类LatLong {
公共静态类Map扩展Mapper< Object,Text,Text,Text> {
// private final static IntWritable one = new IntWritable(1);
$ b $ public void map(Object key,Text value,Context context)throws IOException,InterruptedException {
String line = value.toString();
String [] longLatArray = line.split(,);
double longi = Double.parseDouble(longLatArray [0]);
double lat = Double.parseDouble(longLatArray [1]);
//列表< Double> origLatLong = new ArrayList< Double>(2);
//origLatLong.add(lat);
//origLatLong.add(longi);
Geohash inst = Geohash.getInstance();
// encode是库的编码函数
String hash = inst.encode(lat,longi);
//使用前5个字符仅用于测试目的
//稍后需要找到正确的一个
int accuracy = 4;
//将事物的哈希值缩短为任何我想出的结果
//成为每个瓦片的正确大小
Text shortenedHash = new Text(hash.substring(0,accuracy)) ;
文本origHash =新文本(散列);
context.write(shortenedHash,origHash);
}
}
public static class Reduce extends Reducer< Text,Text,Text,Text> {
private IntWritable totalTileElementCount = new IntWritable();
私人文本latlongimag = new Text();
private text dataSeparator = new Text();
$ b @Override
public void reduce(Text key,Iterable< Text> values,Context context)throws IOException,InterruptedException {
int elementCount = 0;
boolean first = true;
迭代器<文本> it = values.iterator();
String lat = new String();
String longi = new String();
Geohash inst = Geohash.getInstance(); (it.hasNext()){
elementCount = elementCount + 1;
;
if(first)
{
double [] doubleArray =(inst.decode(it.next()。toString()));
lat = Double.toString(doubleArray [0]);
longi = Double.toString(doubleArray [1]);
first = false;
}
}
totalTileElementCount.set(elementCount);
// Geohash inst = Geohash.getInstance();
String mag = totalTileElementCount.toString();
latlongimag.set(lat +,+ longi +,+ mag +,);
dataSeparator.set();
context.write(latlongimag,dataSeparator);
public static void main(String [] args)throws Exception {
Configuration conf = new Configuration();
工作职位=新职位(conf,wordcount);
job.setJarByClass(LatLong.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job,new Path(args [0]));
FileOutputFormat.setOutputPath(job,new Path(args [1]));
job.waitForCompletion(true);
}
}
(it.hasNext()){
elementCount = elementCount + 1; $ {
if(first)
{
double [] doubleArray =(inst.decode(it.next()。toString()));
lat = Double.toString(doubleArray [0]);
longi = Double.toString(doubleArray [1]);
first = false;
您将设置为first = false ; 如此在
while(it.hasNext())
循环迭代 if(first)
没有被输入,并且 it.next()
永远不会再被调用,所以如果它有多个元素
it.hasNext()
将会返回true,您将永远不会退出而
循环。
So, I am looking for an infinite loop somewhere, and I don't know if there is anything else that can cause this. I am using four cluster nodes, so I am pretty sure that there cannot be a lack of RAM, as has been suggested in other questions of the same kind.
My code:
package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import util.hashing.*;
public class LatLong {
public static class Map extends Mapper<Object, Text, Text, Text> {
//private final static IntWritable one = new IntWritable(1);
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] longLatArray = line.split(",");
double longi = Double.parseDouble(longLatArray[0]);
double lat = Double.parseDouble(longLatArray[1]);
//List<Double> origLatLong = new ArrayList<Double>(2);
//origLatLong.add(lat);
//origLatLong.add(longi);
Geohash inst = Geohash.getInstance();
//encode is the library's encoding function
String hash = inst.encode(lat,longi);
//Using the first 5 characters just for testing purposes
//Need to find the right one later
int accuracy = 4;
//hash of the thing is shortened to whatever I figure out
//to be the right size of each tile
Text shortenedHash = new Text(hash.substring(0,accuracy));
Text origHash = new Text(hash);
context.write(shortenedHash, origHash);
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
private IntWritable totalTileElementCount = new IntWritable();
private Text latlongimag = new Text();
private Text dataSeparator = new Text();
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int elementCount = 0;
boolean first = true;
Iterator<Text> it = values.iterator();
String lat = new String();
String longi = new String();
Geohash inst = Geohash.getInstance();
while (it.hasNext()) {
elementCount = elementCount+1;
if(first)
{
double[] doubleArray = (inst.decode(it.next().toString()));
lat = Double.toString(doubleArray[0]);
longi = Double.toString(doubleArray[1]);
first = false;
}
}
totalTileElementCount.set(elementCount);
//Geohash inst = Geohash.getInstance();
String mag = totalTileElementCount.toString();
latlongimag.set(lat+","+ longi +","+mag+",");
dataSeparator.set("");
context.write(latlongimag, dataSeparator );
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
job.setJarByClass(LatLong.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
Inside
while (it.hasNext()) {
elementCount = elementCount+1;
if(first)
{
double[] doubleArray = (inst.decode(it.next().toString()));
lat = Double.toString(doubleArray[0]);
longi = Double.toString(doubleArray[1]);
first = false;
}
}
You set first = false;
so in next while (it.hasNext())
loop iteration the if(first)
is not entered and it.next()
is never called again, so if it
has more than one element it.hasNext()
will allways return true and you will never leave this while
loop.
这篇关于Hadoop Mapreduce作业卡住地图100%减少51%的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!