使用自定义可写从 Hadoop Map Reduce 作业输出列表 [英] Output a list from a Hadoop Map Reduce job using custom writable

查看:16
本文介绍了使用自定义可写从 Hadoop Map Reduce 作业输出列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试通过更改 hadoop 给出的字数示例来创建一个简单的 map reduce 作业.

I'm trying to create a simple map reduce job by changing the wordcount example given by hadoop.

我正在尝试列出一个列表而不是单词的数量.wordcount 示例给出以下输出

I'm trying to out put a list instead of a count of the words. The wordcount example gives the following ouput

hello 2
world 2

我正在尝试将其作为列表输出,这将构成未来工作的基础

I'm trying to get it to output as a list, which will form the basis of future work

hello 1 1
world 1 1

我认为我在正确的轨道上,但我在编写列表时遇到了麻烦.而不是上述,我得到

I think I'm on the right track but I'm having trouble writing the list. Instead of the above, I'm getting

Hello   foo.MyArrayWritable@61250ff2
World   foo.MyArrayWritable@483a0ab1

这是我的 MyArrayWritable.我在 write(DataOuptut arg0) 中放置了一个 sys,但它从不输出任何内容,所以我认为该方法可能不会被调用,我不知道为什么.

Here's my MyArrayWritable. I put a sys out in the write(DataOuptut arg0) but it never output anything so I think that method might not be called and I don't know why.

class MyArrayWritable extends ArrayWritable{

public MyArrayWritable(Class<? extends Writable> valueClass, Writable[] values) {
    super(valueClass, values);
}
public MyArrayWritable(Class<? extends Writable> valueClass) {
    super(valueClass);
}

@Override
public IntWritable[] get() {
    return (IntWritable[]) super.get();
}

@Override
public void write(DataOutput arg0) throws IOException {
    for(IntWritable i : get()){
        i.write(arg0);
    }
}
}

编辑 - 添加更多源代码

public class WordCount {

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
} 

public static class Reduce extends Reducer<Text, IntWritable, Text, MyArrayWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        ArrayList<IntWritable> list = new ArrayList<IntWritable>();    
        for (IntWritable val : values) {
            list.add(val);
        }
        context.write(key, new MyArrayWritable(IntWritable.class, list.toArray(new IntWritable[list.size()])));
    }
}

public static void main(String[] args) throws Exception {
    if(args == null || args.length == 0)
        args = new String[]{"./wordcount/input","./wordcount/output"};
    Path p = new Path(args[1]);
    FileSystem fs = FileSystem.get(new Configuration());
    fs.exists(p);
    fs.delete(p, true);

    Configuration conf = new Configuration();

    Job job = new Job(conf, "wordcount");
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setJarByClass(WordCount.class);
    job.setInputFormatClass(TextInputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
}

}

推荐答案

您的减速器中有一个错误" - 值迭代器在整个循环中重复使用相同的 IntWritable,因此您应该将要添加的值包装到列表如下:

You have a 'bug' in your reducer - the value iterator re-uses the same IntWritable throughout the loop, so you should wrap the value being added to the list as follows:

public void reduce(Text key, Iterable<IntWritable> values, Context context)
                                      throws IOException, InterruptedException {
    ArrayList<IntWritable> list = new ArrayList<IntWritable>();    
    for (IntWritable val : values) {
        list.add(new IntWritable(val));
    }
    context.write(key, new MyArrayWritable(IntWritable.class, list.toArray(new IntWritable[list.size()])));
}

这实际上不是问题,因为您使用的是数组列表,而您的映射器只输出一个值(一个),但是如果您扩展此代码,可能会绊倒您.

This isn't actually a problem as you're using an array list and your mapper only outputs a single value (one) but is something that may trip you up if you ever extend this code.

您还需要在您的作业中定义您的 map 和 reducer 输出类型不同:

You also need to define in your job that your map and reducer output types are different:

// map output types
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// reducer output types

job.setOutputValueClass(Text.class);
job.setOutputValueClass(MyArrayWritable.class);

您可能希望明确定义 reducer 的数量(这可能就是为什么您从未看到您的 sysouts 被写入任务日志的原因,特别是如果您的集群管理员已将默认数量定义为 0):

You might want to explicitly define the number of reducers (which may be why you never see your sysouts being written to the task logs, especially if your cluster admin has defined the default number to be 0):

job.setNumReduceTasks(1);

您使用默认的 Text 输出格式,它在输出键和值对上调用 toString() - MyArrayWritable 没有重写 toString 方法,因此您应该在 MyArrayWritable 中放一个:

Your using the default Text output format, which calls toString() on the output key and value pairs - MyArrayWritable doesn't have an overridden toString method so you should put one in your MyArrayWritable:

@Override
public String toString() {
  return Arrays.toString(get());
}

最后从 MyArrayWritable 中删除覆盖的 write 方法 - 这不是与免费 readFields 方法兼容的有效实现.您不需要覆盖此方法,但如果您这样做(假设您想查看 sysout 以验证它是否被调用),则改为执行以下操作:

Finally remove the overridden write method from MyArrayWritable - this is not a valid implementation compatible with the complimentary readFields method. you don't need to override this method but if you do (say you want to see a sysout to verify it's being called) then do something like this instead:

@Override
public void write(DataOutput arg0) throws IOException {
  System.out.println("write method called");
  super.write(arg0);
}

这篇关于使用自定义可写从 Hadoop Map Reduce 作业输出列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆