如何在hadoop map中写avro输出reduce? [英] How to write avro output in hadoop map reduce?

查看:158
本文介绍了如何在hadoop map中写avro输出reduce?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我编写了一个Hadoop字数统计程序,它使用 TextInputFormat 输入,并且应该以avro格式输出字数。



Map-Reduce作业运行正常,但使用unix命令(如 more vi 。我期待这个输出是不可读的,因为avro输出是二进制格式。



我只使用了mapper,reducer不存在。我只想尝试avro,所以我不担心内存或堆栈溢出。 public class WordCountMapper扩展Mapper< LongWritable,Text,AvroKey< String>,AvroValue< Integer>>< ; {

私人地图< String,Integer> wordCountMap = new HashMap< String,Integer>();
$ b @Override
protected void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException {
String [] keys = value.toString()。split( (String currentKey:keys){
int currentCount = 1;
String currentToken = currentKey.trim() .toLowerCase();
if(wordCountMap.containsKey(currentToken)){
currentCount = wordCountMap.get(currentToken);
currentCount ++;
}
wordCountMap.put (currentToken,currentCount);
}
System.out.println(DEBUG:唯一字词总数=+ wordCountMap.size());
}
$ b (Map.Entry< String,Integer> currentKeyValue:wordCountMap.entrySet()){
AvroKey< Str $ {$ b $覆盖
保护无效清理(上下文上下文)抛出IOException,InterruptedException { ing> currentKey = new AvroKey< String>(currentKeyValue.getKey());
AvroValue<整数> currentValue = new AvroValue< Integer>(currentKeyValue.getValue());
context.write(currentKey,currentValue);
}
}
}

和驱动程序代码如下:

  public int run(String [] args)throws Exception {

Job avroJob = new Job (getConf());
avroJob.setJarByClass(AvroWordCount.class);
avroJob.setJobName(Avro word count);

avroJob.setInputFormatClass(TextInputFormat.class);
avroJob.setMapperClass(WordCountMapper.class);

AvroJob.setInputKeySchema(avroJob,Schema.create(Type.INT));
AvroJob.setInputValueSchema(avroJob,Schema.create(Type.STRING));

AvroJob.setMapOutputKeySchema(avroJob,Schema.create(Type.STRING));
AvroJob.setMapOutputValueSchema(avroJob,Schema.create(Type.INT));

AvroJob.setOutputKeySchema(avroJob,Schema.create(Type.STRING));
AvroJob.setOutputValueSchema(avroJob,Schema.create(Type.INT));


FileInputFormat.addInputPath(avroJob,new Path(args [0]));
FileOutputFormat.setOutputPath(avroJob,new Path(args [1]));

返回avroJob.waitForCompletion(true)? 0:1;
}

我想知道avro输出是怎么样的,我在做什么在这个程序中是错误的。

解决方案

Avro库的最新版本包含更新的示例。我建议你看看它,使用相同的模式,因为他们在Reduce类中使用或只是扩展AvroMapper 。请注意,使用 Pair 而不是AvroKey + AvroValue,对于在Hadoop上运行Avro也是必不可少的。


I wrote one Hadoop word count program which takes TextInputFormat input and is supposed to output word count in avro format.

Map-Reduce job is running fine but output of this job is readable using unix commands such as more or vi. I was expecting this output be unreadable as avro output is in binary format.

I have used mapper only, reducer is not present. I just want to experiment with avro so I am not worried about memory or stack overflow. Following the the code of mapper

public class WordCountMapper extends Mapper<LongWritable, Text, AvroKey<String>, AvroValue<Integer>> {

    private Map<String, Integer> wordCountMap = new HashMap<String, Integer>();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] keys = value.toString().split("[\\s-*,\":]");
        for (String currentKey : keys) {
            int currentCount = 1;
            String currentToken = currentKey.trim().toLowerCase();
            if(wordCountMap.containsKey(currentToken)) {
                currentCount = wordCountMap.get(currentToken);
                currentCount++;
            }
            wordCountMap.put(currentToken, currentCount);
        }
        System.out.println("DEBUG : total number of unique words = " + wordCountMap.size());
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        for (Map.Entry<String, Integer> currentKeyValue : wordCountMap.entrySet()) {
            AvroKey<String> currentKey = new AvroKey<String>(currentKeyValue.getKey());
            AvroValue<Integer> currentValue = new AvroValue<Integer>(currentKeyValue.getValue());
            context.write(currentKey, currentValue);
        }
    }
}

and driver code is as follows :

public int run(String[] args) throws Exception {

    Job avroJob = new Job(getConf());
    avroJob.setJarByClass(AvroWordCount.class);
    avroJob.setJobName("Avro word count");

    avroJob.setInputFormatClass(TextInputFormat.class);
    avroJob.setMapperClass(WordCountMapper.class);

    AvroJob.setInputKeySchema(avroJob, Schema.create(Type.INT));
    AvroJob.setInputValueSchema(avroJob, Schema.create(Type.STRING));

    AvroJob.setMapOutputKeySchema(avroJob, Schema.create(Type.STRING));
    AvroJob.setMapOutputValueSchema(avroJob, Schema.create(Type.INT));

    AvroJob.setOutputKeySchema(avroJob, Schema.create(Type.STRING));
    AvroJob.setOutputValueSchema(avroJob, Schema.create(Type.INT));


    FileInputFormat.addInputPath(avroJob, new Path(args[0]));
    FileOutputFormat.setOutputPath(avroJob, new Path(args[1]));

    return avroJob.waitForCompletion(true) ? 0 : 1;
}

I would like to know how do avro output looks like and what am I doing wrong in this program.

解决方案

Latest release of Avro library includes an updated example of their ColorCount example adopted for MRv2. I suggest you to look at it, use the same pattern as they use in Reduce class or just extend AvroMapper. Please note that using Pair class instead of AvroKey+AvroValue is also essential for running Avro on Hadoop.

这篇关于如何在hadoop map中写avro输出reduce?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆