Hadoop - 还原器没有启动 [英] Hadoop - a reducer is not being initiated
问题描述
我试图在Hadoop 2.6.0上运行开源的kNN加入MapReduce hbrj算法,用于单节点集群 - 安装在我的笔记本电脑上的伪分布式操作(OSX)。这是代码。
Mapper,reducer和主要驱动程序:
public class RPhase2 extends Configured implements Tool
{
public static class MapClass extends MapReduceBase
implements Mapper< LongWritable,Text,IntWritable,RPhase2Value>
public void map(LongWritable key,Text value,
OutputCollector< IntWritable,RPhase2Value> output,
Reporter reporter)throws IOException
{
String line = value.toString();
String [] parts = line.split(+);
//钥匙格式< rid1>
IntWritable mapKey = new IntWritable(Integer.valueOf(parts [0]));
//值格式< rid2,dist>
RPhase2Value np2v = new RPhase2Value(Integer.valueOf(parts [1]),Float.valueOf(parts [2]));
System.out.println(############### key:+ mapKey.toString()+np2v:+ np2v.toString());
output.collect(mapKey,np2v);
public static class Reduce extends MapReduceBase
implements Reducer< IntWritable,RPhase2Value,NullWritable,Text>
{
int numberOfPartition;
int knn;
class Record {...}
类RecordComparator实现比较器< Record> {...}
public void configure(JobConf job)
{
numberOfPartition = job.getInt(numberOfPartition,2);
knn = job.getInt(knn,3);
System.out.println(########## configurations!);
}
$ b $ public void reduce(IntWritable key,Iterator< RPhase2Value> values,
OutputCollector< NullWritable,Text> output,
Reporter reporter)throws IOException
{
//初始化pq
RecordComparator rc = new RecordComparator();
PriorityQueue< Record> pq =新的PriorityQueue< Record>(knn + 1,rc);
System.out.println(阶段2在减少);
System.out.println(########## key:+ key.toString());
//对于每条记录,我们都有一个减少任务
//值格式< rid1,rid2,dist>
while(values.hasNext())
{
RPhase2Value np2v = values.next();
int id2 = np2v.getFirst()。get();
float dist = np2v.getSecond()。get();
记录记录=新记录(id2,dist);
pq.add(record);
if(pq.size()> knn)
pq.poll();
}
while(pq.size()> 0)
{
output.collect(NullWritable.get(),new Text(key.toString( )++ pq.poll()。toString()));
// break; //只输出第一条记录
}
} // reduce
} // Reducer
public int run(String [] args)throws异常{
JobConf conf =新的JobConf(getConf(),RPhase2.class);
conf.setJobName(RPhase2);
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(RPhase2Value.class);
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(MapClass.class);
conf.setReducerClass(Reduce.class);
int numberOfPartition = 0;
列表< String> other_args = new ArrayList< String>();
for(int i = 0; i< args.length; ++ i)
{
try {
if(-m.equals( args [i])){
//conf.setNumMapTasks(Integer.parseInt(args[++i)));
++ i;
} else if(-r.equals(args [i])){
conf.setNumReduceTasks(Integer.parseInt(args [++ i]));
} else if(-p.equals(args [i])){
numberOfPartition = Integer.parseInt(args [++ i]);
conf.setInt(numberOfPartition,numberOfPartition);
} else if(-k.equals(args [i])){
int knn = Integer.parseInt(args [++ i]);
conf.setInt(knn,knn);
System.out.println(knn +〜hi);
} else {
other_args.add(args [i]);
}
conf.setNumReduceTasks(numberOfPartition * numberOfPartition);
//conf.setNumReduceTasks(1);
} catch(NumberFormatException除外){
System.out.println(ERROR:Integer expected,而不是+ args [i]);
return printUsage();
} catch(ArrayIndexOutOfBoundsException except){
System.out.println(错误:必需的参数缺少来自+ args [i-1]);
return printUsage();
}
}
FileInputFormat.setInputPaths(conf,other_args.get(0));
FileOutputFormat.setOutputPath(conf,new Path(other_args.get(1)));
JobClient.runJob(conf);
返回0;
}
public static void main(String [] args)throws Exception {
int res = ToolRunner.run(new Configuration(),new RPhase2(),args);
}
} // RPhase2
当我运行这个映射器时成功但作业突然终止,减速器从未实例化。而且,不会打印任何错误(即使在日志文件中)。我知道这也是因为Reducer配置中的打印语句从不打印。输出:
15/06/15 14:00:37信息mapred.LocalJobRunner:地图任务执行器完成。
15/06/15 14:00:38信息mapreduce.Job:地图100%减少0%
15/06/15 14:00:38信息mapreduce.Job:Job job_local833125918_0001成功完成
15/06/15 14:00:38信息mapreduce.Job:计数器:20
文件系统计数器
FILE:读取的字节数= 12505456
FILE:写入的字节数= 14977422
FILE:读取操作的数量= 0
FILE:大量读取操作的数量= 0
FILE:写入操作的数量= 0
HDFS:读取的字节数= 11408
HDFS:写入的字节数= 8724
HDFS:读取操作次数= 216
HDFS:大量读取操作数量= 0
HDFS:写入操作次数= 99
Map-Reduce Framework
Map输入记录= 60
Map输出记录= 60
输入分割字节数= 963
溢出记录= 0
失败Shuffles = 0
合并的地图输出= 0
已用GC时间(毫秒) = 14
总承诺堆使用率(字节)= 1717567488
文件输入格式计数器
字节读= 2153
文件输出格式计数器
字节写= 1645
到目前为止我所做的:
-
我一直在寻找类似的问题,并且我发现最常见的问题是在映射器和缩减器的输出不同时完成的配置输出格式上面的代码:conf.setMapOutputKeyClass(Class); conf.setMapOutputValueClass(Class);
-
在另一篇文章中,我发现了一个改变reduce的建议(...,Iterator< ...>,.. )到(...,Iterable< ...>,...),这给我编译带来麻烦。我不能再使用.getNext()和.next()方法以及出现此错误:
错误:Reduce不是抽象的并且不会覆盖抽象方法reduce (IntWritable,Iterator,OutputCollector,Reporter)
如果任何人有任何提示或建议,找到问题是我会非常感激!
请注意,我在此之前发布了一个关于我的问题的问题( Hadoop kNN加入算法卡住地图100%减少0%)但它没有得到足够的重视,所以我想从不同的角度重新提出这个问题。您可以使用此链接查看日志文件的更多详细信息。
我已经发现了这个问题,而且这很愚蠢。如果您在上面的代码中注意到,在读取参数之前,numberOfPartition设置为0,并且减法器的数量设置为numberOfPartition * numberOfPartition。我,因为用户没有更改分区数量参数(主要是因为我简单地复制粘贴从他们提供的自述文件中的参数行),所以这就是为什么reducer从未开始。
I am trying to run open source kNN join MapReduce hbrj algorithm on a Hadoop 2.6.0 for single node cluster - pseudo-distributed operation installed on my laptop (OSX). This is the code.
Mapper, reducer and the main driver:
public class RPhase2 extends Configured implements Tool
{
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, IntWritable, RPhase2Value>
{
public void map(LongWritable key, Text value,
OutputCollector<IntWritable, RPhase2Value> output,
Reporter reporter) throws IOException
{
String line = value.toString();
String[] parts = line.split(" +");
// key format <rid1>
IntWritable mapKey = new IntWritable(Integer.valueOf(parts[0]));
// value format <rid2, dist>
RPhase2Value np2v = new RPhase2Value(Integer.valueOf(parts[1]), Float.valueOf(parts[2]));
System.out.println("############### key: " + mapKey.toString() + " np2v: " + np2v.toString());
output.collect(mapKey, np2v);
}
}
public static class Reduce extends MapReduceBase
implements Reducer<IntWritable, RPhase2Value, NullWritable, Text>
{
int numberOfPartition;
int knn;
class Record {...}
class RecordComparator implements Comparator<Record> {...}
public void configure(JobConf job)
{
numberOfPartition = job.getInt("numberOfPartition", 2);
knn = job.getInt("knn", 3);
System.out.println("########## configuring!");
}
public void reduce(IntWritable key, Iterator<RPhase2Value> values,
OutputCollector<NullWritable, Text> output,
Reporter reporter) throws IOException
{
//initialize the pq
RecordComparator rc = new RecordComparator();
PriorityQueue<Record> pq = new PriorityQueue<Record>(knn + 1, rc);
System.out.println("Phase 2 is at reduce");
System.out.println("########## key: " + key.toString());
// For each record we have a reduce task
// value format <rid1, rid2, dist>
while (values.hasNext())
{
RPhase2Value np2v = values.next();
int id2 = np2v.getFirst().get();
float dist = np2v.getSecond().get();
Record record = new Record(id2, dist);
pq.add(record);
if (pq.size() > knn)
pq.poll();
}
while(pq.size() > 0)
{
output.collect(NullWritable.get(), new Text(key.toString() + " " + pq.poll().toString()));
//break; // only ouput the first record
}
} // reduce
} // Reducer
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), RPhase2.class);
conf.setJobName("RPhase2");
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(RPhase2Value.class);
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(MapClass.class);
conf.setReducerClass(Reduce.class);
int numberOfPartition = 0;
List<String> other_args = new ArrayList<String>();
for(int i = 0; i < args.length; ++i)
{
try {
if ("-m".equals(args[i])) {
//conf.setNumMapTasks(Integer.parseInt(args[++i]));
++i;
} else if ("-r".equals(args[i])) {
conf.setNumReduceTasks(Integer.parseInt(args[++i]));
} else if ("-p".equals(args[i])) {
numberOfPartition = Integer.parseInt(args[++i]);
conf.setInt("numberOfPartition", numberOfPartition);
} else if ("-k".equals(args[i])) {
int knn = Integer.parseInt(args[++i]);
conf.setInt("knn", knn);
System.out.println(knn + "~ hi");
} else {
other_args.add(args[i]);
}
conf.setNumReduceTasks(numberOfPartition * numberOfPartition);
//conf.setNumReduceTasks(1);
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args[i]);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " + args[i-1]);
return printUsage();
}
}
FileInputFormat.setInputPaths(conf, other_args.get(0));
FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new RPhase2(), args);
}
} // RPhase2
When I run this the mapper is successful but the job terminates suddenly, and the reducer never instantiated. Moreover, no errors are ever printed (even in the log files). I know that also because the print statements in the configuration of the Reducer never get printed. Output:
15/06/15 14:00:37 INFO mapred.LocalJobRunner: map task executor complete.
15/06/15 14:00:38 INFO mapreduce.Job: map 100% reduce 0%
15/06/15 14:00:38 INFO mapreduce.Job: Job job_local833125918_0001 completed successfully
15/06/15 14:00:38 INFO mapreduce.Job: Counters: 20
File System Counters
FILE: Number of bytes read=12505456
FILE: Number of bytes written=14977422
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=11408
HDFS: Number of bytes written=8724
HDFS: Number of read operations=216
HDFS: Number of large read operations=0
HDFS: Number of write operations=99
Map-Reduce Framework
Map input records=60
Map output records=60
Input split bytes=963
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=14
Total committed heap usage (bytes)=1717567488
File Input Format Counters
Bytes Read=2153
File Output Format Counters
Bytes Written=1645
What I have done so far:
I have been looking at similar questions, and I found the most frequent problem is not configuring the output formats when the output of the mapper and reducer are different which is done in the code above: conf.setMapOutputKeyClass(Class); conf.setMapOutputValueClass(Class);
In another post I found a suggestion to change reduce(..., Iterator <...>, ...) to (..., Iterable <...>, ...) which gave me trouble compiling. I could no longer use .getNext() and .next() methods as well as got this error:
error: Reduce is not abstract and does not override abstract method reduce(IntWritable,Iterator,OutputCollector,Reporter) in Reducer
If anyone has any hints or suggestions on what I can try to find what the issue is I would be very appreciative!
Just a note that I have posted a question about my problem before in here (Hadoop kNN join algorithm stuck at map 100% reduce 0%) but it did not get enough attention so I wanted to re-ask this from a different perspective. You could use this link for more details on my log files.
I have figured out the problem and it was something silly. If you notice in the code above, numberOfPartition is set to 0 before the arguments are read, and the number of reducers are set to numberOfPartition * numberOfPartition. I, as the user did not change the number of partitions parameter (mostly because I simply copy pasted the argument line from their provided README) so that's why the reducer never even started.
这篇关于Hadoop - 还原器没有启动的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!