如何在作业完成之前重新运行hadoop中的整个地图/进行缩减? [英] How to re-run whole map/reduce in hadoop before job completion?

查看:122
本文介绍了如何在作业完成之前重新运行hadoop中的整个地图/进行缩减?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用Java的Hadoop Map/Reduce

I using Hadoop Map/Reduce using Java

假设,我已经完成了整个地图/归约工作.有什么办法可以只重复整个地图/缩小部分,而无需结束工作.我的意思是,我不想使用不同作业的任何链接,而只希望重复map/reduce部分.

Suppose, I have completed a whole map/reduce job. Is there any way I could repeat the whole map/reduce part only, without ending the job. I mean, I DON'T want to use any chaining of the different jobs but only only want the map/reduce part to repeat.

谢谢!

推荐答案

因此,我对hadoop流API更为熟悉,但是方法应该转换为本机API.

So I am more familiar with hadoop streaming APIs but approach should translate to the native APIs.

据我所知,您要尝试对输入数据运行相同map()和reduce()操作的几次迭代.

In my understanding what you are trying to do is run the several iterations of same map() and reduce() operations on the input data.

可以说,您的初始map()输入数据来自文件input.txt,输出文件为output + {iteration} .txt(其中迭代是循环计数,迭代= [0,迭代次数)). 在map()/reduce()的第二次调用中,您的输入文件为output + {iteration},输出文件将变为output + {iteration +1} .txt.

Lets say your initial map() input data comes from file input.txt and the output file is output + {iteration}.txt (where iteration is loop count, iteration =[0, # of iteration)). In the second invocation of the map()/reduce() your input file is output+{iteration} and output file would become output+{iteration +1}.txt.

让我知道是否不清楚,我可以想出一个简单的例子,并在此处发布链接.

Let me know if this is not clear, I can conjure up a quick example and post a link here.

编辑 * 因此,对于Java,我将hadoop wordcount示例修改为可以多次运行

EDIT* So for Java I modified the hadoop wordcount example to run multiple times

package com.rorlig;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountJob {
  public static class TokenizerMapper 
     extends Mapper<Object, Text, Text, IntWritable>{

 private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();

 public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
   }
 }
}

public static class IntSumReducer 
   extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, 
                   Context context
                   ) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
  }
}

public static void main(String[] args) throws Exception {
 Configuration conf = new Configuration();

if (args.length != 3) {
  System.err.println("Usage: wordcount <in> <out> <iterations>");
  System.exit(2);
}
int iterations = new Integer(args[2]);
Path inPath = new Path(args[0]);
Path outPath =  null;
for (int i = 0; i<iterations; ++i){
    outPath = new Path(args[1]+i);
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCountJob.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, inPath);
    FileOutputFormat.setOutputPath(job, outPath);
    job.waitForCompletion(true);
    inPath = outPath;
   }
 }
}

希望这会有所帮助

这篇关于如何在作业完成之前重新运行hadoop中的整个地图/进行缩减?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆