如何在两个地图缩减作业之间传递变量 [英] How to pass variable between two map reduce jobs

查看:97
本文介绍了如何在两个地图缩减作业之间传递变量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已链接两个Map缩减作业。 Job1将只有一个reducer,我正在计算一个float值。我想在Job2的Reducer中使用这个值。这是我的主要方法设置。

  public static String GlobalVriable; 
public static void main(String [] args)throws Exception {

int runs = 0;
for(; runs< 10; runs ++){
String inputPath =part-r-000+ nf.format(runs);
String outputPath =part-r-000+ nf.format(runs + 1);
MyProgram.MR1(inputPath);
MyProgram.MR2(inputPath,outputPath);



public static void MR1(String inputPath)
throws IOException,InterruptedException,ClassNotFoundException {

配置conf = new配置();
conf.set(var1,);
Job job = new Job(conf,This is job1);
job.setJarByClass(MyProgram.class);
job.setMapperClass(MyMapper1.class);
job.setReducerClass(MyReduce1.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job,new Path(inputPath));
job.waitForCompletion(true);
GlobalVriable = conf.get(var1); //我在这里得到NULL


public static void MR2(String inputPath,String outputPath)
throws IOException,InterruptedException,ClassNotFoundException {

配置conf =新配置();
Job job = new Job(conf,This is job2);
...
}

public static class MyReduce1 extends
Reducer< Text,FloatWritable,Text,FloatWritable> {
$ b $ public void reduce(Text key,Iterable< FloatWritable> values,Context context)
throws IOException,InterruptedException {

float s = 0;
for(FloatWritable val:values){
s + = val.get();
}

String sum = Float.toString(s);
context.getConfiguration()。set(var1,sum);






$ b你可以看到我需要迭代整个程序多次。我的Job1正在计算输入中的一个数字。由于它只是一个单一的数字和很多迭代,我不想将它写入HDFS并从中读取。有没有办法共享Myreducer1中计算的值,并在Myreducer2中使用它。

更新:我试着使用conf.set& amp; amp; conf.get。

解决方案

以下是如何通过计数器传递浮点值的方法...



首先,在第一个reducer中,将float值转换为long,乘以1000(例如,保持3位精度)并将结果存入计数器:公共无效清理(上下文上下文){

长结果=(长)(floatValue * 1000); $ p

  
context.getCounter(Result,Result)。increment(result);






在驱动程序类中,检索long值并将其转换返回一个float:

  public static void MR1(String inputPath)
throws IOException,InterruptedException,ClassNotFoundException {

配置conf = new Configuration();
Job job = new Job(conf,This is job1);
job.setJarByClass(MyProgram.class);
job.setMapperClass(MyMapper1.class);
job.setReducerClass(MyReduce1.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job,new Path(inputPath));
job.waitForCompletion(true);

long result = job.getCounters()。findCounter(Result,Result)。getValue();
float value =((float)result)/ 1000;

}


I have chained two Map reduce jobs. The Job1 will have only one reducer and I am computing a float value. I want to use this value in my reducer of Job2. This is my main method setup.

public static String GlobalVriable;
public static void main(String[] args) throws Exception {

        int runs = 0;
        for (; runs < 10; runs++) {
            String inputPath = "part-r-000" + nf.format(runs);
            String outputPath = "part-r-000" + nf.format(runs + 1);
            MyProgram.MR1(inputPath);
            MyProgram.MR2(inputPath, outputPath);
        }
    }

    public static void MR1(String inputPath)
            throws IOException, InterruptedException, ClassNotFoundException {

        Configuration conf = new Configuration();
        conf.set("var1","");
        Job job = new Job(conf, "This is job1");
        job.setJarByClass(MyProgram.class);
        job.setMapperClass(MyMapper1.class);
        job.setReducerClass(MyReduce1.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);
        FileInputFormat.addInputPath(job, new Path(inputPath));
        job.waitForCompletion(true);
        GlobalVriable = conf.get("var1"); // I am getting NULL here
    }

    public static void MR2(String inputPath, String outputPath)
            throws IOException, InterruptedException, ClassNotFoundException {

        Configuration conf = new Configuration();
        Job job = new Job(conf, "This is job2");
        ...
    }

    public static class MyReduce1 extends
        Reducer<Text, FloatWritable, Text, FloatWritable> {

    public void reduce(Text key, Iterable<FloatWritable> values, Context context)
            throws IOException, InterruptedException {

        float s = 0;
        for (FloatWritable val : values) {
            s += val.get();
        }

        String sum = Float.toString(s);
        context.getConfiguration().set("var1", sum);
    }
}

As you can see I need to iterate the entire program multiple times. My Job1 is computing a single number from the input. Since it is just a single number and a lot of iterations I dont want to write it to HDFS and read from it. Is there a way to share the value computed in Myreducer1 and use it in Myreducer2.

UPDATE: I have tried passing the value using conf.set & conf.get. The value is not being passed.

解决方案

Here's how to pass back a float value via a counter ...

First, in the first reducer, transform the float value into a long by multiplying by 1000 (to maintain 3 digits of precision, for example) and putting the result into a counter:

public void cleanup(Context context) {

    long result = (long) (floatValue * 1000);
    context.getCounter("Result","Result").increment(result); 

}

In the driver class, retrieve the long value and transform it back to a float:

public static void MR1(String inputPath)
        throws IOException, InterruptedException, ClassNotFoundException {

    Configuration conf = new Configuration();
    Job job = new Job(conf, "This is job1");
    job.setJarByClass(MyProgram.class);
    job.setMapperClass(MyMapper1.class);
    job.setReducerClass(MyReduce1.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FloatWritable.class);
    FileInputFormat.addInputPath(job, new Path(inputPath));
    job.waitForCompletion(true);

    long result = job.getCounters().findCounter("Result","Result").getValue();
    float value = ((float)result) / 1000;

}

这篇关于如何在两个地图缩减作业之间传递变量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆