Java MapReduce按日期计算 [英] Java MapReduce counting by date
问题描述
我是Hadoop的新手,我正在尝试制作一个MapReduce程序,按日期计算最大前两个出版物(按月分组)。所以我的输入是这样的:
I'm new to Hadoop, and i'm trying to do a MapReduce program, to count the max first two occurrencise of lecters by date (grouped by month). So my input is of this kind :
2017-06-01 , A, B, A, C, B, E, F
2017-06-02 , Q, B, Q, F, K, E, F
2017-06-03 , A, B, A, R, T, E, E
2017-07-01 , A, B, A, C, B, E, F
2017-07-05 , A, B, A, G, B, G, G
所以,我正在考虑这个MapReducer程序的结果,例如:
so, i'm expeting as result of this MapReducer program, something like :
2017-06, A:4, E:4
2017-07, A:4, B:4
public class ArrayGiulioTest {
public static Logger logger = Logger.getLogger(ArrayGiulioTest.class);
public static class CustomMap extends Mapper<LongWritable, Text, Text, TextWritable> {
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
TextWritable array = new TextWritable();
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line, ",");
String dataAttuale = tokenizer.nextToken().substring(0,
line.lastIndexOf("-"));
Text tmp = null;
Text[] tmpArray = new Text[tokenizer.countTokens()];
int i = 0;
while (tokenizer.hasMoreTokens()) {
String prod = tokenizer.nextToken(",");
word.set(dataAttuale);
tmp = new Text(prod);
tmpArray[i] = tmp;
i++;
}
array.set(tmpArray);
context.write(word, array);
}
}
public static class CustomReduce extends Reducer<Text, TextWritable, Text, Text> {
public void reduce(Text key, Iterator<TextWritable> values,
Context context) throws IOException, InterruptedException {
MapWritable map = new MapWritable();
Text txt = new Text();
while (values.hasNext()) {
TextWritable array = values.next();
Text[] tmpArray = (Text[]) array.toArray();
for(Text t : tmpArray) {
if(map.get(t)!= null) {
IntWritable val = (IntWritable) map.get(t);
map.put(t, new IntWritable(val.get()+1));
} else {
map.put(t, new IntWritable(1));
}
}
}
Set<Writable> set = map.keySet();
StringBuffer str = new StringBuffer();
for(Writable k : set) {
str.append("key: " + k.toString() + " value: " + map.get(k) + "**");
}
txt.set(str.toString());
context.write(key, txt);
}
}
public static void main(String[] args) throws Exception {
long inizio = System.currentTimeMillis();
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "countProduct");
job.setJarByClass(ArrayGiulioTest.class);
job.setMapperClass(CustomMap.class);
//job.setCombinerClass(CustomReduce.class);
job.setReducerClass(CustomReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TextWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
long fine = System.currentTimeMillis();
logger.info("**************************************End" + (End-Start));
System.exit(1);
}
}
我已经实现了我的自定义TextWritable以这种方式:
and i've implemented my custom TextWritable in this way :
public class TextWritable extends ArrayWritable {
public TextWritable() {
super(Text.class);
}
}
..所以当我运行我的MapReduce程序时,我获得了这种结果
..so when i run my MapReduce program i obtain a result of this kind
2017-6 wordcount.TextWritable@3e960865
2017-6 wordcount.TextWritable@3e960865
显然我的减速机不起作用。看来我的Mapper输出
it's obvious that my reducer it doesn't works. It seems the output from my Mapper
有什么想法吗?有人可以说,解决方案是否正确?
Any idea? And someone can says if is the right path to the solution?
这里是控制台日志(仅供参考,我的输入文件有6行而不是5行)
*我在eclipse(mono)下获得相同的结果启动MapReduce问题JVM)或使用带有Hdfs的Hadoop
Here Console Log (Just for information, my input file has 6 rows instead of 5) *I obtain the same result starting MapReduce problem under eclipse(mono JVM) or using Hadoop with Hdfs
File System Counters
FILE: Number of bytes read=1216
FILE: Number of bytes written=431465
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=6
Map output records=6
Map output bytes=214
Map output materialized bytes=232
Input split bytes=97
Combine input records=0
Combine output records=0
Reduce input groups=3
Reduce shuffle bytes=232
Reduce input records=6
Reduce output records=6
Spilled Records=12
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=0
Total committed heap usage (bytes)=394264576
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=208
File Output Format Counters
Bytes Written=1813
推荐答案
主要问题是关于reduce方法的符号:
The main problem is about the sign of the reduce method :
我写的是: public void reduce(文本键,Iterator< TextWritable>值,
上下文上下文)
而不是
public void reduce(Text key, Iterable<ArrayTextWritable> values,
这这就是为什么我获得我的Map输出而不是我的Reduce otuput
This is the reason why i obtain my Map output instead of my Reduce otuput
这篇关于Java MapReduce按日期计算的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!