将对象从驱动程序传递给MapReduce [英] Passing objects to MapReduce from a driver

查看:107
本文介绍了将对象从驱动程序传递给MapReduce的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个读取配置文件的驱动程序,建立一个对象列表(基于配置)并将该列表传递给MapReduce(MapReduce具有一个静态属性,该属性持有对该对象列表的引用)。



它可以工作,但只能在本地使用。只要我在集群配置上运行作业,我会得到各种错误,提示列表尚未建立。它让我觉得我做错了,在集群设置上,MapReduce独立于驱动程序运行。

我的问题是如何正确初始化Mapper。



(我正在使用Hadoop 2.4.1)

解决方案

这与边数据分布的问题有关。

有两种方法可用于边数据分发。

<1>分布式缓存

2)配置

由于您有要共享的对象,因此可以使用Configuration类。

这个讨论将取决于配置类,以便在整个集群中提供一个对象,所有映射器和(或)还原器都可以访问该对象。这里的方法很简单。配置分类的setString(String,String)setter用于实现此任务。必须共享的对象在驱动程序端序列化为一个Java字符串,并在Mapper或Reducer中反序列化为对象。



在下面的示例代码,我已经使用com.google.gson.Gson类进行简单的序列化和反序列化。您也可以使用Java序列化。



代表对象的类您需要共享

  public class TestBean {
String string1;
字符串string2;
public TestBean(String test1,String test2){
super();
this.string1 = test1;
this.string2 = test2;

public TestBean(){
this(,);
}
public String getString1(){
return string1;
}
public void setString1(String test1){
this.string1 = test1;
}
public String getString2(){
return string2;
}
public void setString2(String test2){
this.string2 = test2;


您可以设置的主类配置

  public class GSONTestDriver {
public static void main(String [] args)throws Exception {
System.out.println(In Main);
Configuration conf = new Configuration();
TestBean testB1 = new TestBean(Hello1,Gson1);
TestBean testB2 = new TestBean(Hello2,Gson2);
Gson gson = new Gson();
String testSerialization1 = gson.toJson(testB1);
String testSerialization2 = gson.toJson(testB2);
conf.set(instance1,testSerialization1);
conf.set(instance2,testSerialization2);
工作职位=新职位(conf,GSON Test);
job.setJarByClass(GSONTestDriver.class);
job.setMapperClass(GSONTestMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job,new Path(args [0]));
FileOutputFormat.setOutputPath(job,new Path(args [1]));
job.waitForCompletion(true);






您可以从中检索的mapper类对象

  public class GSONTestMapper extends 
Mapper< LongWritable,Text,Text,NullWritable> {
配置conf;
String inst1;
String inst2;
public void setup(Context context){
conf = context.getConfiguration();
inst1 = conf.get(instance1);
inst2 = conf.get(instance2);
Gson gson = new Gson();
TestBean tb1 = gson.fromJson(inst1,TestBean.class);
System.out.println(tb1.getString1());
System.out.println(tb1.getString2());
TestBean tb2 = gson.fromJson(inst2,TestBean.class);
System.out.println(tb2.getString1());
System.out.println(tb2.getString2());

public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException {
context.write(value,NullWritable.get());




$ b $ p
$ b

将bean转换为序列化的Json字符串,使用com.google.gson.Gson类的toJson(Object src)方法。然后,序列化的Json字符串作为值通过配置实例传递,并通过Mapper中的名称访问。该字符串在那里使用相同Gson类的fromJson(String json,Class classOfT)方法进行反序列化。而不是我的测试bean,你可以放置你的对象。

I created a driver which reads a config file, builds a list of objects (based on the config) and passes that list to MapReduce (MapReduce has a static attribute which holds a reference to that list of object).

It works but only locally. As soon as I run the job on a cluster config I will get all sort of errors suggesting that the list hasn't been built. It makes me think that I'm doing it wrong and on a cluster setup MapReduce is being run independently from the driver.

My question is how to correctly initialise a Mapper.

(I'm using Hadoop 2.4.1)

解决方案

This is related to the problem of side data distribution.

There are two approaches for side data distribution.

1) Distributed Caches

2) Configuration

As you have the objects to be shared, we can use the Configuration class.

This discussion will depend on the Configuration class to make available an Object across the cluster, accessible to all Mappers and(or) Reducers. The approach here is quite simple. The setString(String, String) setter of the Configuration classed is harnessed to achieve this task. The Object that has to be shared across is serialized into a java string at the driver end and is de-serialized back to the object at the Mapper or Reducer.

In the example code below, I have used com.google.gson.Gson class for the easy serialization and deserialization. You can use Java Serialization as well.

Class that Represents the Object You need to Share

 public class TestBean {
    String string1;
    String string2;
    public TestBean(String test1, String test2) {
        super();
        this.string1 = test1;
        this.string2 = test2;
    }
    public TestBean() {
        this("", "");
    }
    public String getString1() {
        return string1;
    }
    public void setString1(String test1) {
        this.string1 = test1;
    }
    public String getString2() {
        return string2;
    }
    public void setString2(String test2) {
        this.string2 = test2;
    }
}

The Main Class from where you can set the Configurations

public class GSONTestDriver {
    public static void main(String[] args) throws Exception {
        System.out.println("In Main");
        Configuration conf = new Configuration();
        TestBean testB1 = new TestBean("Hello1","Gson1");
        TestBean testB2 = new TestBean("Hello2","Gson2");
        Gson gson = new Gson();
        String testSerialization1 = gson.toJson(testB1);
        String testSerialization2 = gson.toJson(testB2);
        conf.set("instance1", testSerialization1);
        conf.set("instance2", testSerialization2);
        Job job = new Job(conf, " GSON Test");
        job.setJarByClass(GSONTestDriver.class);
        job.setMapperClass(GSONTestMapper.class);
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
    }
}

The mapper class from where you can retrieve the object

public class GSONTestMapper extends
Mapper<LongWritable, Text, Text, NullWritable> {
    Configuration conf;
    String inst1;
    String inst2;
    public void setup(Context context) {
        conf = context.getConfiguration();
        inst1 = conf.get("instance1");
        inst2 = conf.get("instance2");
        Gson gson = new Gson();
        TestBean tb1 = gson.fromJson(inst1, TestBean.class);
        System.out.println(tb1.getString1());
        System.out.println(tb1.getString2());
        TestBean tb2 = gson.fromJson(inst2, TestBean.class);
        System.out.println(tb2.getString1());
        System.out.println(tb2.getString2());
    } 
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        context.write(value,NullWritable.get());
    }
}

The bean is converted to a serialized Json String using the toJson(Object src) method of the class com.google.gson.Gson. Then the serialised Json string is passed as value through the configuration instance and accessed by name from the Mapper. The string is deserialized there using the fromJson(String json, Class classOfT) method of the same Gson class. Instead of my test bean, you could place your objects.

这篇关于将对象从驱动程序传递给MapReduce的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆