全局变量的值在循环之后不会改变 [英] Global variable's value doesn't change after for Loop

查看:1455
本文介绍了全局变量的值在循环之后不会改变的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个hadoop项目。我想在某一天找到客户,然后写下当天消费量最大的客户。在我的reducer类中,出于某种原因,全局变量 max 在for循环之后不会改变它的值。

编辑我想在某一天找到最大消费的客户。我已经设法在我想要的日期找到客户,但我在我的Reducer类中遇到问题。这里是代码:

编辑#2 我已经知道值(消费)是自然数。所以在我的输出文件中,我只想成为某一天的客户,而且消耗的最多。



编辑#3 我的输入文件是由许多数据组成的。它有三个栏目;客户的ID,时间戳(yyyy-mm-DD HH:mm:ss)和消费
$ b

驱动程序类

  import org.apache.hadoop.conf.Configuration; 
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class alicanteDriver {

public static void main(String [] args)throws Exception {
long t_start = System.currentTimeMillis();
long t_end;

配置conf = new Configuration();
Job job = Job.getInstance(conf,Alicante);
job.setJarByClass(alicanteDriver.class);

job.setMapperClass(alicanteMapperC.class);

//job.setCombinerClass (alicanteCombiner.class);

job.setPartitionerClass(alicantePartitioner.class);

job.setNumReduceTasks(2);

job.setReducerClass(alicanteReducerC.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path(/ alicante_1y.txt));
FileOutputFormat.setOutputPath(job,new Path(/ alicante_output));
job.waitForCompletion(true);
t_end = System.currentTimeMillis();

System.out.println((t_end-t_start)/ 1000);




Mapper类
p>

  import java.io.IOException; 
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
导入org.apache.hadoop.mapreduce.Mapper;

public class alicanteMapperC extends
Mapper< LongWritable,Text,Text,IntWritable> {

String Customer = new String();
SimpleDateFormat ft = new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);
日期t =新日期();
IntWritable Consumption = new IntWritable();
int counter = 0;

//新变量
int max = 0;
$ b @Override
public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException {

Date d2 = null;
尝试{
d2 = ft.parse(2013-07-01 01:00:00);
} catch(ParseException e1){
// TODO自动生成的catch块
e1.printStackTrace();


if(counter> 0){

String line = value.toString();
StringTokenizer itr = new StringTokenizer(line,,); $()
$ b while(itr.hasMoreTokens()){
Customer = itr.nextToken();
尝试{
t = ft.parse(itr.nextToken());
} catch(ParseException e){
// TODO自动生成的catch块
e.printStackTrace();
}
Consumption.set(Integer.parseInt(itr.nextToken()));

//尽可能多地分类
if(Consumption.get()> max){
max = Consumption.get();


//在特定日期找到客户
if(t.compareTo(d2)== 0&&& Consumption.get()== max){
context.write(new Text(Customer),Consumption);
}
}
}
counter ++;




Reducer class p>

  import java.io.IOException; 
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.google.common.collect.Iterables;

public class alicanteReducerC extends
Reducer< Text,IntWritable,Text,IntWritable> {
$ b $ public void reduce(Text key,Iterable< IntWritable> values,Context context)
throws IOException,InterruptedException {

int max = 0; //这个var

//列表
的声明List< Text> l1 = new ArrayList< Text>();
列表< IntWritable> l2 = new ArrayList< IntWritable>(); (IntWritable val:values){
if(val.get()> max){
max = val.get();


}
l1.add(key);
l2.add(val); (int i = 0; i< l1.size(); i ++){
if(l2.get(i).get()==


$ b max){
context.write(key,new IntWritable(max));



$ b code


<输入文件的一些值

  C11FA586148,2013-07-01 01:00:00,3 
C11FA586152,2015-09-01 15:22:22,3
C11FA586168,2015-02-01 15:22:22,1
C11FA586258,2013-07-01 01:00 :00,5
C11FA586413,2013-07-01 01:00:00,5
C11UA487446,2013-09-01 15:22:22,3
C11UA487446,2013-07- 01 01:00:00,3
C11FA586148,2013-07-01 01:00:00,4

输出应该是

  C11FA586258 5 
C11FA586413 5

我已经搜索了几个小时的论坛,但仍然找不到问题。任何想法?

解决方案

这里是重构的代码: 通过/更改消费日期的特定值。在这种情况下,你不需要减速器。我的第一个答案是从输入查询最大消费,这个答案是查询用户从输入提供的消费。
setup 方法将获得用户提供的 mapper.maxConsumption.date 的值,并将它们传递给 map 方法。在Reducer中扫描所有最大消费客户,并在输入中写入最大最大值(在这种情况下为5) - 详细执行日志的屏幕快照如下所示: cleaup / p>

的形式运行
$ b

  hadoop jar maxConsumption.jar -Dmapper.maxConsumption.date =2013-07-01 01:00:00Data / input.txt output / maxConsupmtion5 






  #input:
C11FA586148,2013-07-01 01:00:00,3
C11FA586152,2015-09-01 15:22:22,3
C11FA586168,2015-02-01 15:22:22,1
C11FA586258,2013 -07-01 01:00:00,5
C11FA586413,2013-07-01 01:00:00,5
C11UA487446,2013-09-01 15:22:22,3
C11UA487446,2013-07-01 01:00:00,3
C11FA586148,2013-07-01 01:00:00,4

#输出:
C11FA586258 5
C11FA586413 5






  public class maxConsumption extends Configured implements Tool {

public static class DataMapper extends Mapper< O bject,Text,Text,IntWritable> {
SimpleDateFormat ft = new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);
日期dateInFile,filterDate;
int lineno = 0;
private final static Text customer = new Text();
私人最终静态IntWritable消费=新IntWritable();
private final static Text maxConsumptionDate = new Text();

public void setup(Context context){
Configuration config = context.getConfiguration();
maxConsumptionDate.set(config.get(mapper.maxConsumption.date));

$ b $ public void map(Object key,Text value,Context context)throws IOException,InterruptedException {
try {
lineno ++;
filterDate = ft.parse(maxConsumptionDate.toString());
//从行/文件映射数据
String [] fields = value.toString()。split(,);
customer.set(fields [0] .trim());
dateInFile = ft.parse(fields [1] .trim());
consumption.set(Integer.parseInt(fields [2] .trim()));

if(dateInFile.equals(filterDate))//如果日期过滤器匹配,则只发送给reducer ....
context.write(new Text(customer),consumption);
catch(Exception e){
System.err.println(Invaid Data at line:+ lineno +Error:+ e.getMessage());



$ b public static class DataReducer extends Reducer< Text,IntWritable,Text,IntWritable> {
LinkedHashMap< String,Integer> maxConsumption = new LinkedHashMap< String,Integer>();
@Override
public void reduce(Text key,Iterable< IntWritable> values,Context context)
throws IOException,InterruptedException {
int max = 0;
System.out.print(reducer received:+ key +[);
for(IntWritable value:values){
System.out.print(value.get()+);
if(value.get()> max)
max = value.get();
}
System.out.println(]);
System.out.println(key.toString()+max is+ max);
maxConsumption.put(key.toString(),max);

$ b @Override
保护无效清理(上下文上下文)
抛出IOException,InterruptedException {
int max = 0;
//首先从reducer
中找到最大值(String key:maxConsumption.keySet()){
System.out.println(cleaup customer:+ key.toString()+ 消费:+ maxConsumption.get(key)
+max:+ max); $ key
if(maxConsumption.get(key)> max)
max = maxConsumption.get(key);
}

System.out.println(final max is:+ max);
//只写map
的最大值(String key:maxConsumption.keySet()){
if(maxConsumption.get(key)== max)
context .write(new Text(key),new IntWritable(maxConsumption.get(key)));



$ b public static void main(String [] args)throws Exception {
int res = ToolRunner.run(new Configuration( ),new maxConsumption(),args);
System.exit(res);

$ b $ public int run(String [] args)throws Exception {
if(args.length!= 2){
System.err.println(用法:-Dmapper.maxConsumption.date = \2013-07-01 01:00:00 \< in>< out>);
System.exit(2);
}
配置conf = this.getConf();
Job job = Job.getInstance(conf,get-max-consumption);
job.setJarByClass(maxConsumption.class);
job.setMapperClass(DataMapper.class);
job.setReducerClass(DataReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job,new Path(args [0]));
FileOutputFormat.setOutputPath(job,new Path(args [1]));
FileSystem fs = null;
Path dstFilePath = new Path(args [1]);
尝试{
fs = dstFilePath.getFileSystem(conf);
if(fs.exists(dstFilePath))
fs.delete(dstFilePath,true);
} catch(IOException e1){
e1.printStackTrace();
}
return job.waitForCompletion(true)? 0:1;


$ / code $ / pre
$ b


I am developing a hadoop project. I want to find customers in a certain day and then write those with the max consumption in that day. In my reducer class, for some reason, the global variable max doesn't change it's value after a for loop.

EDIT I want to find the customers with max consumption in a certain day. I have managed to find the customers in the date I want, but I am facing a problem in my Reducer class. Here is the code:

EDIT #2 I already know that the values(consumption) are Natural numbers. So in my output file I want to be only the customers, of a certain day, with max consumption.

EDIT #3 My input file is consisted of many data. It has three columns; the customer's id, the timestamp (yyyy-mm-DD HH:mm:ss) and the consumption

Driver class

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class alicanteDriver {

    public static void main(String[] args) throws Exception {
        long t_start = System.currentTimeMillis();
        long t_end;

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Alicante");
        job.setJarByClass(alicanteDriver.class);

        job.setMapperClass(alicanteMapperC.class);        

        //job.setCombinerClass(alicanteCombiner.class);

        job.setPartitionerClass(alicantePartitioner.class);

        job.setNumReduceTasks(2);

        job.setReducerClass(alicanteReducerC.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("/alicante_1y.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/alicante_output"));
        job.waitForCompletion(true);
        t_end = System.currentTimeMillis();

        System.out.println((t_end-t_start)/1000);
    }
 }

Mapper class

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class alicanteMapperC extends
        Mapper<LongWritable, Text, Text, IntWritable> {

    String Customer = new String();
    SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    Date t = new Date();
    IntWritable Consumption = new IntWritable();
    int counter = 0;

    // new vars
    int max = 0;

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        Date d2 = null;
        try {
            d2 = ft.parse("2013-07-01 01:00:00");
        } catch (ParseException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        if (counter > 0) {

            String line = value.toString();
            StringTokenizer itr = new StringTokenizer(line, ",");

            while (itr.hasMoreTokens()) {
                Customer = itr.nextToken();
                try {
                    t = ft.parse(itr.nextToken());
                } catch (ParseException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                Consumption.set(Integer.parseInt(itr.nextToken()));

                //sort out as many values as possible
                if(Consumption.get() > max) {
                    max = Consumption.get();
                }

                //find customers in a certain date
                if (t.compareTo(d2) == 0 && Consumption.get() == max) {
                    context.write(new Text(Customer), Consumption);
                }
            }
        }
        counter++;
    }
}

Reducer class

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.google.common.collect.Iterables;

public class alicanteReducerC extends
        Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {

        int max = 0; //this var

        // declaration of Lists
        List<Text> l1 = new ArrayList<Text>();
        List<IntWritable> l2 = new ArrayList<IntWritable>();

        for (IntWritable val : values) {
            if (val.get() > max) {
                max = val.get();
            }
            l1.add(key);
            l2.add(val);
        }

        for (int i = 0; i < l1.size(); i++) {
            if (l2.get(i).get() == max) {
                context.write(key, new IntWritable(max));
            }
        }
    }
}

Some values of the Input file

C11FA586148,2013-07-01 01:00:00,3
C11FA586152,2015-09-01 15:22:22,3
C11FA586168,2015-02-01 15:22:22,1
C11FA586258,2013-07-01 01:00:00,5
C11FA586413,2013-07-01 01:00:00,5
C11UA487446,2013-09-01 15:22:22,3
C11UA487446,2013-07-01 01:00:00,3
C11FA586148,2013-07-01 01:00:00,4

Output should be

C11FA586258 5
C11FA586413 5

I have searched the forums for a couple of hours, and still can't find the issue. Any ideas?

解决方案

here is the refactored code: you can pass/change specific value for date of consumption. In this case you don't need reducer. my first answer was to query max comsumption from input, and this answer is to query user provided consumption from input.
setup method will get user provided value for mapper.maxConsumption.date and pass them to map method.
cleaup method in reducer scans all max consumption customers and writes final max in input (i.e, 5 in this case) - see screen shot for detail execution log:

run as:

hadoop jar maxConsumption.jar -Dmapper.maxConsumption.date="2013-07-01 01:00:00" Data/input.txt output/maxConsupmtion5


#input:
C11FA586148,2013-07-01 01:00:00,3
C11FA586152,2015-09-01 15:22:22,3
C11FA586168,2015-02-01 15:22:22,1
C11FA586258,2013-07-01 01:00:00,5
C11FA586413,2013-07-01 01:00:00,5
C11UA487446,2013-09-01 15:22:22,3
C11UA487446,2013-07-01 01:00:00,3
C11FA586148,2013-07-01 01:00:00,4

#output:
C11FA586258 5
C11FA586413 5


public class maxConsumption  extends Configured implements Tool{

    public static class DataMapper extends Mapper<Object, Text, Text, IntWritable> {
        SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date dateInFile, filterDate;
        int lineno=0;
        private final static Text customer = new Text();
        private final static IntWritable consumption = new IntWritable();
        private final static Text maxConsumptionDate = new Text();

        public void setup(Context context) {
            Configuration config = context.getConfiguration();
            maxConsumptionDate.set(config.get("mapper.maxConsumption.date"));
        }

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
            try{
                lineno++;
                filterDate = ft.parse(maxConsumptionDate.toString());
                //map data from line/file
                String[] fields = value.toString().split(",");
                customer.set(fields[0].trim());
                dateInFile = ft.parse(fields[1].trim());
                consumption.set(Integer.parseInt(fields[2].trim()));

                if(dateInFile.equals(filterDate)) //only send to reducer if date filter matches....
                    context.write(new Text(customer), consumption);
            }catch(Exception e){
                System.err.println("Invaid Data at line: " + lineno + " Error: " + e.getMessage());
            }
        }   
    }

    public  static class DataReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        LinkedHashMap<String, Integer> maxConsumption = new LinkedHashMap<String,Integer>();
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) 
                throws IOException, InterruptedException {
            int max=0;
            System.out.print("reducer received: " + key + " [ ");
            for(IntWritable value: values){
                System.out.print( value.get() + " ");
                if(value.get() > max)
                    max=value.get();
            }
            System.out.println( " ]");
            System.out.println(key.toString() + "    max is   " + max);
            maxConsumption.put(key.toString(), max);
        }

        @Override
        protected void cleanup(Context context)
                throws IOException, InterruptedException {
            int max=0;
            //first find the max from reducer
            for (String key : maxConsumption.keySet()){
                 System.out.println("cleaup customer : " + key.toString() + " consumption : " + maxConsumption.get(key) 
                         + " max: " + max); 
                if(maxConsumption.get(key) > max)
                    max=maxConsumption.get(key);
            }

            System.out.println("final max is: " + max);
            //write only the max value from map
            for (String key : maxConsumption.keySet()){
                if(maxConsumption.get(key) == max)
                    context.write(new Text(key), new IntWritable(maxConsumption.get(key)));
            }
        }
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new maxConsumption(), args);
        System.exit(res);
    }

    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: -Dmapper.maxConsumption.date=\"2013-07-01 01:00:00\" <in> <out>");
            System.exit(2);
        }
        Configuration conf = this.getConf();
        Job job = Job.getInstance(conf, "get-max-consumption");
        job.setJarByClass(maxConsumption.class);
        job.setMapperClass(DataMapper.class);
        job.setReducerClass(DataReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileSystem fs = null;
        Path dstFilePath = new Path(args[1]);
        try {
            fs = dstFilePath.getFileSystem(conf);
            if (fs.exists(dstFilePath))
                fs.delete(dstFilePath, true);
        } catch (IOException e1) {
            e1.printStackTrace();
        }
        return job.waitForCompletion(true) ? 0 : 1;
    } 
}

这篇关于全局变量的值在循环之后不会改变的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆