在MapReduce中,如何将ArrayList作为值从mapper发送到reducer [英] In a MapReduce , how to send arraylist as value from mapper to reducer

查看:219
本文介绍了在MapReduce中,如何将ArrayList作为值从mapper发送到reducer的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们如何将一个ArrayList作为值从映射器传递给reducer。

我的代码基本上有一定的规则可以使用,并会根据规则创建新的值(String)。我维护所有输出(在规则执行后生成)在列表中,现在需要将这个输出(Mapper值)发送到Reducer,并且没有办法这样做。



有人可以请我指向一个方向

添加代码

  package develop; 

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

导入utility.RulesExtractionUtility;

public class CustomMap {


public static class CustomerMapper extends Mapper< Object,Text,Text,Text> {
私人地图< String,String>规则;
@Override
public void setup(Context context)
{

try
{
URI [] cacheFiles = context.getCacheFiles() ;
setupRulesMap(cacheFiles [0] .toString());

catch(IOException ioe)
{
System.err.println(Error reading state file。);
System.exit(1);



$ b public void map(Object key,Text value,Context context)throws IOException,InterruptedException {

//地图< String,String> rules = new LinkedHashMap< String,String>();
// rules.put(targetcolumn [1],ASSIGN(source [0]));
// rules.put(targetcolumn [2],INCOME(source [2] + source [3]));
// rules.put(targetcolumn [3],ASSIGN(source [1]);

//上面是规则,它基本上会创建一些列表来自源文件的值

String [] splitSource = value.toString()。split();

List< String> lists = RulesExtractionUtility.rulesEngineExecutor(splitSource,rules );

//列表中的每一行都有一个类似于(name,age)的值,这是我想要在上下文中编写的内容并将其传递给reducer。
//截至目前,我还没有实现reducer代码,因为m坚持传递来自mapper的值。

// context.write(new Text(),lists); ---- I没有办法做到这一点







private void setupRulesMap(String filename)throws IOException
{
Map< String,String> rule = new LinkedHashMap< String,String>();
BufferedR eader reader = new BufferedReader(new FileReader(filename));
String line = reader.readLine();
while(line!= null)
{
String [] split = line.split(=);
rule.put(split [0],split [1]);
line = reader.readLine();

//规则逻辑
}
规则=规则;

$ b $ public static void main(String [] args)throws IllegalArgumentException,IOException,ClassNotFoundException,InterruptedException,URISyntaxException {


Configuration conf =新的配置();
if(args.length!= 2){
System.err.println(用法:customerMapper< in>< out>);
System.exit(2);
}
Job job = Job.getInstance(conf);
job.setJarByClass(CustomMap.class);
job.setMapperClass(CustomerMapper.class);
job.addCacheFile(新的URI(某些HDFS位置));


URI [] cacheFiles = job.getCacheFiles(); (cacheFile!= null){
for(URI cacheFile:cacheFiles){
System.out.println(Cache file - >+ cacheFile);
}
}
// job.setReducerClass(Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job,new Path(args [0]));
FileOutputFormat.setOutputPath(job,new Path(args [1]));

System.exit(job.waitForCompletion(true)?0:1);
}
}


解决方案

将一个ArrayList从一个映射器传递给reducer,很明显对象必须实现Writable接口。为什么不试试这个库?

 <依赖关系> 
< groupId> org.apache.giraph< / groupId>
< artifactId> giraph-core< / artifactId>
< version> 1.1.0-hadoop2< / version>
< /依赖关系>

它有一个抽象类:

  public abstract class ArrayListWritable< M extends org.apache.hadoop.io.Writable> 
扩展了ArrayList< M>
实现org.apache.hadoop.io.Writable,org.apache.hadoop.conf.Configurable

您可以创建自己的类和源代码,填充抽象方法并使用代码实现接口方法。例如:

  public class MyListWritable extends ArrayListWritable< Text> {
...
}


How can we pass an arraylist as value from the mapper to the reducer.

My code basically has certain rules to work with and would create new values(String) based on the rules.I am maintaining all the outputs(generated after the rule execution) in a list and now need to send this output(Mapper value) to the Reducer and do not have a way to do so.

Can some one please point me to a direction

Adding Code

package develop;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

import utility.RulesExtractionUtility;

public class CustomMap{


    public static class CustomerMapper extends Mapper<Object, Text, Text, Text> {
        private Map<String, String> rules;
        @Override
        public void setup(Context context)
        {

            try
            {
                URI[] cacheFiles = context.getCacheFiles();
                setupRulesMap(cacheFiles[0].toString());
            }
            catch (IOException ioe)
            {
                System.err.println("Error reading state file.");
                System.exit(1);
            }

        }

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

//          Map<String, String> rules = new LinkedHashMap<String, String>();
//          rules.put("targetcolumn[1]", "ASSIGN(source[0])");
//          rules.put("targetcolumn[2]", "INCOME(source[2]+source[3])");
//          rules.put("targetcolumn[3]", "ASSIGN(source[1]");

//          Above is the "rules", which would basically create some list values from source file

            String [] splitSource = value.toString().split(" ");

            List<String>lists=RulesExtractionUtility.rulesEngineExecutor(splitSource,rules);

//          lists would have values like (name, age) for each line from a huge text file, which is what i want to write in context and pass it to the reducer.
//          As of now i havent implemented the reducer code, as m stuck with passing the value from mapper.

//          context.write(new Text(), lists);---- I do not have a way of doing this


        }




        private void setupRulesMap(String filename) throws IOException
        {
            Map<String, String> rule = new LinkedHashMap<String, String>();
            BufferedReader reader = new BufferedReader(new FileReader(filename));
            String line = reader.readLine();
            while (line != null)
            {
                String[] split = line.split("=");
                rule.put(split[0], split[1]);
                line = reader.readLine();

                // rules logic
            }
            rules = rule;
        }
    }
    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException, URISyntaxException {


    Configuration conf = new Configuration();
    if (args.length != 2) {
        System.err.println("Usage: customerMapper <in> <out>");
        System.exit(2);
    }
    Job job = Job.getInstance(conf);
    job.setJarByClass(CustomMap.class);
    job.setMapperClass(CustomerMapper.class);
    job.addCacheFile(new URI("Some HDFS location"));


    URI[] cacheFiles= job.getCacheFiles();
    if(cacheFiles != null) {
        for (URI cacheFile : cacheFiles) {
            System.out.println("Cache file ->" + cacheFile);
        }
    }
    // job.setReducerClass(Reducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

解决方案

To pass an arraylist from mapper to reducer, it's clear that objects must implement Writable interface. Why don't you try this library?

<dependency>
    <groupId>org.apache.giraph</groupId>
    <artifactId>giraph-core</artifactId>
    <version>1.1.0-hadoop2</version>
</dependency>

It has an abstract class:

public abstract class ArrayListWritable<M extends org.apache.hadoop.io.Writable>
extends ArrayList<M>
implements org.apache.hadoop.io.Writable, org.apache.hadoop.conf.Configurable

You could create your own class and source code filling the abstract methods and implementing the interface methods with your code. For instance:

public class MyListWritable extends ArrayListWritable<Text>{
    ...
}

这篇关于在MapReduce中,如何将ArrayList作为值从mapper发送到reducer的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆