Hadoop使用Datajoin减少边联接 [英] Hadoop reduce side join using Datajoin

查看:78
本文介绍了Hadoop使用Datajoin减少边联接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用folllowing代码来执行reduce方法join

  / * 
* HadoopMapper.java
*
*创建于2012年4月8日,5:39:51 PM
* /


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
//导入org.apache.commons.logging.Log;
//导入org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join。*;
$ b / **
*
* @author
* /
公共类DataJoin扩展Configured implements工具
{
public static class MapClass extends DataJoinMapperBase
{
protected Text generateInputTag(String inputFile)
{
String datasource = inputFile.split( - )[0];
返回新的文本(数据源);
}
protected Text generateGroupKey(TaggedMapOutput aRecord)
{
String line =((Text)aRecord.getData())。toString();
String [] tokens = line.split(,);
字符串groupKey = tokens [0];
返回新的文本(groupKey);

protected TaggedMapOutput generateTaggedMapOutput(Object value)
{
TaggedWritable retv = new TaggedWritable((Text)value);
retv.setTag(this.inputTag);
return retv;


public static class Reduce extends DataJoinReducerBase
{
protected TaggedMapOutput combine(Object [] tags,Object [] values)
{
if(tags.length< 2)返回null;
String joinedStr =;
for(int i = 0; i< values.length; i ++)
{
if(i> 0)joinedStr + =,;
TaggedWritable tw =(TaggedWritable)values [i];
String line =((Text)tw.getData())。toString();
String [] tokens = line.split(,,2);
joinedStr + =令牌[1];
}
TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
retv.setTag((Text)tags [0]);
return retv;
}
}
public static class TaggedWritable extends TaggedMapOutput
{
private可写入的数据;
public TaggedWritable(可写入数据)
{
this.tag = new Text();
this.data = data;
}

public可写getData()
{
返回数据;
}
public void write(DataOutput out)throws IOException
{
this.tag.write(out);
this.data.write(out);
}
public void readFields(DataInput in)throws IOException
{
this.tag.readFields(in);
this.data.readFields(in);

$ b $ public int run(String [] args)throws Exception
{


Configuration conf = getConf();
JobConf job = new JobConf(conf,DataJoin.class);
String [] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length!= 2)
{
System.err.println(用法:wordcount< in>&out;>);
System.exit(2);
}

Path in = new Path(args [0]);
Path out = new Path(args [1]);
FileInputFormat.setInputPaths(job,in);
FileOutputFormat.setOutputPath(job,out);
job.setJobName(DataJoin);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set(mapred.textoutputformat.separator,,);
JobClient.runJob(job);
返回0;

public static void main(String [] args)抛出异常
{
int res = ToolRunner.run(new Configuration(),
new DataJoin() ,
args);
System.exit(res);
}
}

我可以编译我的代码。当我在hadoop中运行时,组合器出现以下错误:

  12/04/17 19:59:29信息mapred .JobClient:地图100%减少27%
12/04/17 19:59:38信息mapred.JobClient:地图100%减少30%
12/04/17 19:59:47信息mapred .JobClient:map 100%reduce 33%
12/04/17 20:00:23信息mapred.JobClient:任务ID:attempt_201204061316_0018_r_000000_2,状态:FAILED
java.lang.RuntimeException:java.lang。 NoSuchMethodException:DataJoin $ TaggedWritable< INIT>()处org.apache.hadoop.io.serializer org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)

。 WritableSerialization $ WritableDeserializer.deserialize(WritableSerialization.java:62)维持在org.apache.hadoop org.apache.hadoop.io.serializer.WritableSerialization $ WritableDeserializer.deserialize(WritableSerialization.java:40)

。 mapred.Task $ ValuesIterator.readNextValue(Task.java:1136)
在org.apache.hadoop.map red.Task $ ValuesIterator.next(Task.java:1076)
at org.apache.hadoop.mapred.ReduceTask $ ReduceValuesIterator.moveToNext(ReduceTask.java:246)
at org.apache.hadoop。 mapred.ReduceTask $ ReduceValuesIterator.next(ReduceTask.java:242)
。在org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.regroup(DataJoinReducerBase.java:106)

我用来运行hadoop的命令是
/ hadoop / core / bin / hadoop jar / export / scratch / lopez / Join / DataJoin.jar DataJoin / export / scratch / user / lopez / Join / export / scratch / user / lopez / Join_Output

并且DataJoin.jar文件包含DataJoin $ TaggedWritable打包在这里



我查了一些论坛,发现这个错误可能是由于非静态类造成的。

有人可以帮我吗? b

谢谢克里斯我按照你的说法编辑过。我更新了我的代码以获取两个文件。但我得到相同的错误信息

我得到相同的消息
INFO mapred.FileInputFormat:要处理的总输入路径:2



错误是

 状态:FAILED 
java.lang.ArrayIndexOutOfBoundsException:1
在DataJoin $ Reduce.combine(DataJoin.java:69)
在org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.joinAndCollect(DataJoinReducerBase.java:205)
。在组织.apache.hadoop.contrib.utils.join.DataJoinReducerBase.joinAndCollect(DataJoinReducerBase.java:214)
at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.joinAndCollect(DataJoinReducerBase.java:214)
。在org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.joinAndCollect(DataJoinReducerBase.java:181)
在org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.reduce(DataJoinReducerBase.java: 135)
在org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:468)



{


配置conf = getConf();
JobConf job = new JobConf(conf,DataJoin.class);
String [] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length!= 3)
{
System.err.println(用法:wordcount< in< in1>< out>);
System.exit(2);
}

Path in = new Path(args [0]);
Path in1 = new Path(args [1]);
Path out = new Path(args [2]);
FileInputFormat.setInputPaths(job,in,in1);
FileOutputFormat.setOutputPath(job,out);
job.setJobName(DataJoin);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set(mapred.textoutputformat.separator,,);
JobClient.runJob(job);
返回0;



解决方案

一个默认的TaggedWritable构造函数(Hadoop使用反射来创建这个对象,并且需要一个默认的构造函数(无参数)。)

你也有一个问题,你的readFields方法,您在可写接口上调用 data.readFields(in) - 但不知道 data 的实际运行时类。 。



我建议你在输出数据对象本身之前写出数据类名,或者查看GenericWritable类(需要扩展它来定义

所以你可以修改如下:

pre >
$ b public TaggedWritable(){
this.tag = new Text();
}

public TaggedWritable(可写入数据){
this.tag = new Text();
this.data = data;
}

public可写getData(){
返回数据;
}

public void setData(可写数据){
this.data = data;
}

public void write(DataOutput out)throws IOException {
this.tag.write(out);
out.writeUTF(this.data.getClass()。getName());
this.data.write(out);
}

public void readFields(DataInput in)throws IOException {
this.tag.readFields(in);
String dataClz = in.readUTF();
if(this.data == null
||!this.data.getClass()。getName()。equals(dataClz)){
this.data =(可写)ReflectionUtils。 newInstance(
Class.forName(dataClz),null);
}
this.data.readFields(in);
}
}


I am using the folllowing code to do the reduce side join

/*
 * HadoopMapper.java
 *
 * Created on Apr 8, 2012, 5:39:51 PM
 */


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
// import org.apache.commons.logging.Log;
// import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.*; 

/**
 *
 * @author 
 */
public class DataJoin extends Configured implements Tool 
    {
        public static class MapClass extends DataJoinMapperBase 
            {
                protected Text generateInputTag(String inputFile) 
                    {
                        String datasource = inputFile.split("-")[0];
                        return new Text(datasource);
                    }
            protected Text generateGroupKey(TaggedMapOutput aRecord) 
                {
                    String line = ((Text) aRecord.getData()).toString();
                    String[] tokens = line.split(",");
                    String groupKey = tokens[0];
                    return new Text(groupKey);
                }
            protected TaggedMapOutput generateTaggedMapOutput(Object value) 
                {
                    TaggedWritable retv = new TaggedWritable((Text) value);
                    retv.setTag(this.inputTag);
                    return retv;
                }
            }
        public static class Reduce extends DataJoinReducerBase 
            {
                protected TaggedMapOutput combine(Object[] tags, Object[] values) 
                    {
                        if (tags.length < 2) return null;
                        String joinedStr = "";
                        for (int i=0; i<values.length; i++) 
                        {
                            if (i > 0) joinedStr += ",";
                            TaggedWritable tw = (TaggedWritable) values[i];
                            String line = ((Text) tw.getData()).toString();
                            String[] tokens = line.split(",", 2);
                            joinedStr += tokens[1];
                        }
                        TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
                        retv.setTag((Text) tags[0]);
                        return retv;
                    }
            }
        public static class TaggedWritable extends TaggedMapOutput 
            {
                private Writable data;
                public TaggedWritable(Writable data) 
                    {
                        this.tag = new Text("");
                        this.data = data;
                    }

                public Writable getData() 
                    {
                        return data;
                    }
                public void write(DataOutput out) throws IOException
                    {
                        this.tag.write(out);
                        this.data.write(out);
                    }
                public void readFields(DataInput in) throws IOException 
                    {
                        this.tag.readFields(in);
                        this.data.readFields(in);
                    }
            }
        public int run(String[] args) throws Exception 
            {


                                Configuration conf = getConf();
                JobConf job = new JobConf(conf, DataJoin.class);
                                String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
                                if (otherArgs.length != 2) 
                                {
                                  System.err.println("Usage: wordcount <in> <out>");
                                  System.exit(2);
                                }

                Path in = new Path(args[0]);
                Path out = new Path(args[1]);
                FileInputFormat.setInputPaths(job, in);
                FileOutputFormat.setOutputPath(job, out);
                job.setJobName("DataJoin");
                job.setMapperClass(MapClass.class);
                job.setReducerClass(Reduce.class);
                job.setInputFormat(TextInputFormat.class);
                job.setOutputFormat(TextOutputFormat.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(TaggedWritable.class);
                job.set("mapred.textoutputformat.separator", ",");
                JobClient.runJob(job);
                return 0;
            }
        public static void main(String[] args) throws Exception 
            {
                int res = ToolRunner.run(new Configuration(),
                new DataJoin(),
                args);
                System.exit(res);
            }
    }

I am able to compile my code. When I run in hadoop I am getting the following error with the combiner

12/04/17 19:59:29 INFO mapred.JobClient:  map 100% reduce 27%
12/04/17 19:59:38 INFO mapred.JobClient:  map 100% reduce 30%
12/04/17 19:59:47 INFO mapred.JobClient:  map 100% reduce 33%
12/04/17 20:00:23 INFO mapred.JobClient: Task Id : attempt_201204061316_0018_r_000000_2, Status : FAILED
java.lang.RuntimeException: java.lang.NoSuchMethodException: DataJoin$TaggedWritable.<init>()
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)
        at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:62)
        at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
        at org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1136)
        at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1076)
        at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext(ReduceTask.java:246)
        at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:242)
        at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.regroup(DataJoinReducerBase.java:106)

The command I use to run hadoop is /hadoop/core/bin/hadoop jar /export/scratch/lopez/Join/DataJoin.jar DataJoin /export/scratch/user/lopez/Join /export/scratch/user/lopez/Join_Output

and the DataJoin.jar file has DataJoin$TaggedWritable packaged in it

I checked some forums and found out that the error may occur due to non static class. My program has no non static class!

Could someone please help me


Thank you Chris I edited as you said . I updated my code to take in two files. But I am getting same error message

I am getting the same message INFO mapred.FileInputFormat: Total input paths to process : 2

the error is

     Status : FAILED
    java.lang.ArrayIndexOutOfBoundsException: 1
    at DataJoin$Reduce.combine(DataJoin.java:69)
    at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.joinAndCollect(DataJoinReducerBase.java:205)
    at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.joinAndCollect(DataJoinReducerBase.java:214)
    at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.joinAndCollect(DataJoinReducerBase.java:214)
    at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.joinAndCollect(DataJoinReducerBase.java:181)
    at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.reduce(DataJoinReducerBase.java:135)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:468)



{


    Configuration conf = getConf();
    JobConf job = new JobConf(conf, DataJoin.class);
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 3) 
    {
      System.err.println("Usage: wordcount <in> <in1> <out>");
      System.exit(2);
    }

    Path in = new Path(args[0]);
    Path in1 = new Path(args[1]);
    Path out = new Path(args[2]);
    FileInputFormat.setInputPaths(job,in,in1);
    FileOutputFormat.setOutputPath(job, out);
    job.setJobName("DataJoin");
    job.setMapperClass(MapClass.class);
    job.setReducerClass(Reduce.class);
    job.setInputFormat(TextInputFormat.class);
    job.setOutputFormat(TextOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(TaggedWritable.class);
    job.set("mapred.textoutputformat.separator", ",");
    JobClient.runJob(job);
    return 0;

}

解决方案

You need a default constructor for TaggedWritable (Hadoop uses reflection to create this object, and requires a default constructor (no args).

You also have a problem in that your readFields method, you call data.readFields(in) on the writable interface - but has no knowledge of the actual runtime class of data.

I suggest you either write out the data class name before outputting the data object itself, or look into the GenericWritable class (you'll need to extend it to define the set of allowable writable classes that can be used).

So you could amend as follows:

public static class TaggedWritable extends TaggedMapOutput {
    private Writable data;

    public TaggedWritable() {
        this.tag = new Text();
    }

    public TaggedWritable(Writable data) {
        this.tag = new Text("");
        this.data = data;
    }

    public Writable getData() {
        return data;
    }

    public void setData(Writable data) {
        this.data = data;
    }

    public void write(DataOutput out) throws IOException {
        this.tag.write(out);
        out.writeUTF(this.data.getClass().getName());
        this.data.write(out);
    }

    public void readFields(DataInput in) throws IOException {
        this.tag.readFields(in);
        String dataClz = in.readUTF();
        if (this.data == null
                || !this.data.getClass().getName().equals(dataClz)) {
            this.data = (Writable) ReflectionUtils.newInstance(
                    Class.forName(dataClz), null);
        }
        this.data.readFields(in);
    }
}

这篇关于Hadoop使用Datajoin减少边联接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆