Mapfile作为MapReduce作业的输入 [英] Mapfile as a input to a MapReduce job

查看:96
本文介绍了Mapfile作为MapReduce作业的输入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我最近开始使用Hadoop,并且在使用Mapfile作为MapReduce作业的输入时遇到了问题。 下面的工作代码写了一个简单的MapFile在hdfs中称为TestMap,其中有三个类型为Text的键和三个BytesWritable类型的值。

这里TestMap的内容:

  $ hadoop fs -text / user / hadoop / TestMap / data 
11/01/20 11:17:58 INFO util.NativeCodeLoader:Loaded native-hadoop库
11/01/20 11:17:58信息zlib.ZlibFactory:成功加载&初始化的native-zlib库
11/01/20 11:17:58 INFO compress.CodecPool:有了全新的解压缩程序
A 01
B 02
C 03

以下是创建TestMap Mapfile的程序:

  import java.io.IOException; 
导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.fs.FileSystem;
导入org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils;
$ b $ public class CreateMap {

public static void main(String [] args)throws IOException {

Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);

Text key = new Text();
BytesWritable value = new BytesWritable();
byte [] data = {1,2,3};
String [] strs = {A,B,C};
int bytesRead;
MapFile.Writer writer = null;

writer = new MapFile.Writer(conf,hdfs,TestMap,key.getClass(),value.getClass());
尝试{
for(int i = 0; i <3; i ++){
key.set(strs [i]);
value.set(data,i,1);
writer.append(key,value);
System.out.println(strs [i] +:+ data [i] +added。);


catch(IOException e){
e.printStackTrace();
}
finally {
IOUtils.closeStream(writer);



code
$ b

简单的MapReduce作业如下试图增加一个mapfile的值:

  import java.io.IOException; 
import java.util.Iterator;
导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.conf.Configured;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.BytesWritable;


public class AddOne extends Configured implements Tool {

public static class MapClass extends MapReduceBase

实现Mapper< Text,BytesWritable,Text ,文字> {
$ b $ public void map(Text key,BytesWritable value,
OutputCollector< Text,Text> output,
Reporter记者)抛出IOException {


byte [] data = value.getBytes();
data [0] + = 1;
value.set(data,0,1);
output.collect(key,new Text(value.toString()));



public static class Reduce extends MapReduceBase
implements Reducer< Text,Text,Text,Text> {
$ b $ public void reduce(Text key,Iterator< Text> values,
OutputCollector< Text,Text> output,
Reporter reporter)throws IOException {

output.collect(key,values.next());



public int run(String [] args)throws Exception {
Configuration conf = getConf();

JobConf job = new JobConf(conf,AddOne.class);

Path in = new Path(TestMap);
Path out = new Path(output);
FileInputFormat.setInputPaths(job,in);
FileOutputFormat.setOutputPath(job,out);

job.setJobName(AddOne);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);

job.setInputFormat(SequenceFileInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.set(key.value.separator.in.input.line,:);


JobClient.runJob(job);

返回0;

$ b $ public static void main(String [] args)throws Exception {
int res = ToolRunner.run(new Configuration(),new AddOne(),args);

System.exit(res);


我得到的运行时异常是:

  java.lang.ClassCastException:org.apache.hadoop.io.LongWritable无法转换为org.apache.hadoop.io.BytesWritable $在AddOne上$ b $ MapClass.map(AddOne.java:32)
在org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
在org.apache.hadoop上。 mapred.MapTask.runOldMapper(MapTask.java:358)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
at org.apache.hadoop.mapred.Child。 main(Child.java:170)

我不明白为什么hadoop试图投射LongWritable ,因为在我的代码中,我正确定义了Mapper接口( Mapper< Text,BytesWritable,Text,Text> )。

可以有人帮我吗?

非常感谢你



Luca

解决方案

你的问题来自这样一个事实,尽管名字告诉你你的 MapFile 不是文件。



MapFile实际上是一个由两个文件组成的目录:有一个data文件,它是一个 SequenceFile 包含你的键值写入它;然而,还有一个索引文件,它是一个不同的SequenceFile,其中包含键的子序列以及它们作为LongWritables的偏移量;通过MapFile.Reader将此索引加载到内存中,以便快速进行二分查找,以便在随机访问时在数据文件中找到将获得所需数据的偏移量。



您使用旧的 org.apache.hadoop.mapred的SequenceFileInputFormat版本。当你告诉它将MapFile作为输入来查看数据文件时,知道它并不够聪明;相反,它实际上试图将数据文件用作常规输入文件。数据文件将正常工作,因为类与您指定的内容一致,但索引文件将抛出ClassCastException,因为索引文件的值都是LongWritables。



两个选项:你可以开始使用org.apache.hadoop.mapreduce版本的SequenceFileInputFormat (从而更改代码的其他部分),它只是足够了解MapFiles以查看数据文件;或者,您可以明确地将数据文件作为您想要的文件输入。


I recently started to use Hadoop and I have a problem while using a Mapfile as a input to a MapReduce job.

The following working code, writes a simple MapFile called "TestMap" in hdfs where there are three keys of type Text and three values of type BytesWritable.

Here the contents of TestMap:

$ hadoop fs  -text /user/hadoop/TestMap/data
11/01/20 11:17:58 INFO util.NativeCodeLoader: Loaded the native-hadoop library
11/01/20 11:17:58 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
11/01/20 11:17:58 INFO compress.CodecPool: Got brand-new decompressor
A    01
B    02
C    03

Here is the program that creates the TestMap Mapfile:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils;

public class CreateMap {

    public static void main(String[] args) throws IOException{

        Configuration conf = new Configuration();
        FileSystem hdfs  = FileSystem.get(conf);

        Text key = new Text();
        BytesWritable value = new BytesWritable();
        byte[] data = {1, 2, 3};
        String[] strs = {"A", "B", "C"};
        int bytesRead;
        MapFile.Writer writer = null;

        writer = new MapFile.Writer(conf, hdfs, "TestMap", key.getClass(), value.getClass());
        try {
            for (int i = 0; i < 3; i++) {
                key.set(strs[i]);
                value.set(data, i, 1);
                writer.append(key, value);
                System.out.println(strs[i] + ":" + data[i] + " added.");
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        finally {
             IOUtils.closeStream(writer);
        }
    }
}

The simple MapReduce job that follows tries to increment by one the values of the mapfile:

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.BytesWritable;


public class AddOne extends Configured implements Tool {

    public static class MapClass extends MapReduceBase

        implements Mapper<Text, BytesWritable, Text, Text> {

        public void map(Text key, BytesWritable value,
                        OutputCollector<Text, Text> output,
                        Reporter reporter) throws IOException {


            byte[] data = value.getBytes();
            data[0] += 1;
            value.set(data, 0, 1);
            output.collect(key, new Text(value.toString()));
        }
    }

    public static class Reduce extends MapReduceBase
        implements Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterator<Text> values,
                           OutputCollector<Text, Text> output,
                           Reporter reporter) throws IOException {

            output.collect(key, values.next());
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        JobConf job = new JobConf(conf, AddOne.class);

        Path in = new Path("TestMap");
        Path out = new Path("output");
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setJobName("AddOne");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormat(SequenceFileInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.set("key.value.separator.in.input.line", ":");


        JobClient.runJob(job);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new AddOne(), args);

        System.exit(res);
    }
}

The runtime exception that I get is:

java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.BytesWritable
    at AddOne$MapClass.map(AddOne.java:32)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)

I don't understand why hadoop is trying to cast a LongWritable, since in my code I define the Mapper interface correctly(Mapper<Text, BytesWritable, Text, Text>).

Could somebody help me?

Thank you very much

Luca

解决方案

Your problem comes from the fact that, despite what the name tells you, a MapFile is not a file.

A MapFile is actually a directory that consists of two files: there's a "data" file, which is a SequenceFile containing the keys and values you write into it; however, there is also an "index" file, which is a different SequenceFile containing a subsequence of the keys along with their offsets as LongWritables; this index is loaded into memory by MapFile.Reader to let you quickly binary search to find the offset in the data file that will have the data you want when you do random access.

You're using the old "org.apache.hadoop.mapred" version of SequenceFileInputFormat. It's not smart enough to know to only look at the data file when you tell it to look at a MapFile as input; instead, it actually tries to use the data file and the index file as regular input files. The data file will work correctly because the classes agree with what you specify, but the index file will throw the ClassCastException, because the index file values are all LongWritables.

You have two options: you can start using the "org.apache.hadoop.mapreduce" version of SequenceFileInputFormat (thus changing other parts of your code), which does know enough about MapFiles to just look at the data file; or, instead, you can explicitly give the data file as the file you want as input.

这篇关于Mapfile作为MapReduce作业的输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆