从驱动程序将对象传递给 MapReduce [英] Passing objects to MapReduce from a driver

查看:8
本文介绍了从驱动程序将对象传递给 MapReduce的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个驱动程序,它读取配置文件,构建对象列表(基于配置)并将该列表传递给 MapReduce(MapReduce 有一个静态属性,该属性包含对该对象列表的引用).

I created a driver which reads a config file, builds a list of objects (based on the config) and passes that list to MapReduce (MapReduce has a static attribute which holds a reference to that list of object).

它有效,但仅限于本地.一旦我在集群配置上运行该作业,我就会收到各种错误,表明该列表尚未构建.这让我觉得我做错了,并且在集群设置上 MapReduce 独立于驱动程序运行.

It works but only locally. As soon as I run the job on a cluster config I will get all sort of errors suggesting that the list hasn't been built. It makes me think that I'm doing it wrong and on a cluster setup MapReduce is being run independently from the driver.

我的问题是如何正确初始化 Mapper.

My question is how to correctly initialise a Mapper.

(我使用的是 Hadoop 2.4.1)

(I'm using Hadoop 2.4.1)

推荐答案

这和side data distribution的问题有关.

This is related to the problem of side data distribution.

辅助数据分发有两种方法.

There are two approaches for side data distribution.

1) 分布式缓存

2) 配置

由于您有要共享的对象,我们可以使用配置类.

As you have the objects to be shared, we can use the Configuration class.

这个讨论将依赖于 Configuration 类来使整个集群中的 Object 可用,所有 Mapper 和(或)Reducer 都可以访问.这里的方法很简单.配置类的 setString(String, String) 设置器用于完成此任务.必须共享的对象在驱动端被序列化为java字符串,并在Mapper或Reducer处反序列化回对象.

This discussion will depend on the Configuration class to make available an Object across the cluster, accessible to all Mappers and(or) Reducers. The approach here is quite simple. The setString(String, String) setter of the Configuration classed is harnessed to achieve this task. The Object that has to be shared across is serialized into a java string at the driver end and is de-serialized back to the object at the Mapper or Reducer.

在下面的示例代码中,我使用了 com.google.gson.Gson 类来进行简单的序列化和反序列化.您也可以使用 Java 序列化.

In the example code below, I have used com.google.gson.Gson class for the easy serialization and deserialization. You can use Java Serialization as well.

代表您需要共享的对象的类

 public class TestBean {
    String string1;
    String string2;
    public TestBean(String test1, String test2) {
        super();
        this.string1 = test1;
        this.string2 = test2;
    }
    public TestBean() {
        this("", "");
    }
    public String getString1() {
        return string1;
    }
    public void setString1(String test1) {
        this.string1 = test1;
    }
    public String getString2() {
        return string2;
    }
    public void setString2(String test2) {
        this.string2 = test2;
    }
}

您可以设置配置的主类

public class GSONTestDriver {
    public static void main(String[] args) throws Exception {
        System.out.println("In Main");
        Configuration conf = new Configuration();
        TestBean testB1 = new TestBean("Hello1","Gson1");
        TestBean testB2 = new TestBean("Hello2","Gson2");
        Gson gson = new Gson();
        String testSerialization1 = gson.toJson(testB1);
        String testSerialization2 = gson.toJson(testB2);
        conf.set("instance1", testSerialization1);
        conf.set("instance2", testSerialization2);
        Job job = new Job(conf, " GSON Test");
        job.setJarByClass(GSONTestDriver.class);
        job.setMapperClass(GSONTestMapper.class);
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
    }
}

您可以从中检索对象的映射器类

public class GSONTestMapper extends
Mapper<LongWritable, Text, Text, NullWritable> {
    Configuration conf;
    String inst1;
    String inst2;
    public void setup(Context context) {
        conf = context.getConfiguration();
        inst1 = conf.get("instance1");
        inst2 = conf.get("instance2");
        Gson gson = new Gson();
        TestBean tb1 = gson.fromJson(inst1, TestBean.class);
        System.out.println(tb1.getString1());
        System.out.println(tb1.getString2());
        TestBean tb2 = gson.fromJson(inst2, TestBean.class);
        System.out.println(tb2.getString1());
        System.out.println(tb2.getString2());
    } 
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        context.write(value,NullWritable.get());
    }
}

使用 com.google.gson.Gson 类的 toJson(Object src) 方法将 bean 转换为序列化的 Json 字符串.然后将序列化的 Json 字符串作为值通过配置实例传递,并通过 Mapper 的名称访问.使用同一 Gson 类的 fromJson(String json, Class classOfT) 方法对字符串进行反序列化.您可以放置​​对象,而不是我的测试 bean.

The bean is converted to a serialized Json String using the toJson(Object src) method of the class com.google.gson.Gson. Then the serialised Json string is passed as value through the configuration instance and accessed by name from the Mapper. The string is deserialized there using the fromJson(String json, Class classOfT) method of the same Gson class. Instead of my test bean, you could place your objects.

这篇关于从驱动程序将对象传递给 MapReduce的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆