Hadoop - 直接从Mapper写入HBase [英] Hadoop - Writing to HBase directly from the Mapper

查看:106
本文介绍了Hadoop - 直接从Mapper写入HBase的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个haddop作业,它的输出应该写入HBase。我真的不需要减速器,我想插入的那一行是在Mapper中确定的。

我如何使用TableOutputFormat来实现这一目标?从所有的例子中我都看到了这样的假设:reducer是创建Put的人,而TableMapper只是用于从HBase表中读取。



在我的例子中,输入是HDFS的输出是放在特定的表中,我找不到任何可以帮助我的TableMapReduceUtil。



有什么可以帮助我的例子吗?与此?



顺便说一下,我正在使用新的Hadoop API

解决方案

<这是从文件读取并将所有行放入Hbase的示例。这个例子来自Hbase:权威指南,您可以在存储库中找到它。要获得它只需在您的计算机上克隆回购:

  git clone git://github.com/larsgeorge/hbase-book。 git 

在本书中,您还可以找到关于代码的所有解释。但是如果有什么不可理解的话,请随时询问。

 `public class ImportFromFile {
public static final String NAME =ImportFromFile;
public enum计数器{LINES}

静态类ImportMapper
扩展Mapper< LongWritable,Text,ImmutableBytesWritable,Writable> {
private byte [] family = null;
private byte [] qualifier = null;

@Override
protected void setup(Context context)
throws IOException,InterruptedException {
String column = context.getConfiguration()。get(conf.column );
byte [] [] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
family = colkey [0];
if(colkey.length> 1){
qualifier = colkey [1];


$ b @Override
public void map(LongWritable offset,Text line,Context context)
throws IOException {
try {
String lineString = line.toString();
byte [] rowkey = DigestUtils.md5(lineString);
Put put = new Put(rowkey);
put.add(family,qualifier,Bytes.toBytes(lineString));
context.write(new ImmutableBytesWritable(rowkey),put);
context.getCounter(Counters.LINES).increment(1);
} catch(Exception e){
e.printStackTrace();



$ b private static CommandLine parseArgs(String [] args)throws ParseException {
Options options = new Options();
选项o =新选项(t,table,true,
导入到(必须存在)的表);
o.setArgName(table-name);
o.setRequired(true);
options.addOption(o);
o =新选项(c,column,true,
列存储行数据到(必须存在));
o.setArgName(family:qualifier);
o.setRequired(true);
options.addOption(o);
o =新选项(i,input,true,
要读取的目录或文件);
o.setArgName(HDFS路径);
o.setRequired(true);
options.addOption(o);
options.addOption(d,debug,false,打开DEBUG日志级别);
CommandLineParser parser = new PosixParser();
CommandLine cmd = null;
尝试{
cmd = parser.parse(options,args);
} catch(Exception e){
System.err.println(ERROR:+ e.getMessage()+\\\
);
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(NAME +,options,true);
System.exit(-1);
}
return cmd;


public static void main(String [] args)throws Exception {
Configuration conf = HBaseConfiguration.create();
String [] otherArgs =
GenericOptionsParser(conf,args).getRemainingArgs();
CommandLine cmd = parseArgs(otherArgs);
String table = cmd.getOptionValue(t);
String input = cmd.getOptionValue(i);
String column = cmd.getOptionValue(c);
conf.set(conf.column,列);
Job job = new Job(conf,从文件导入+ input +into table+表格);

job.setJarByClass(ImportFromFile.class);
job.setMapperClass(ImportMapper.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.getConfiguration()。set(TableOutputFormat.OUTPUT_TABLE,table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job,new Path(input));
System.exit(job.waitForCompletion(true)?0:1);
}
}`


I have a haddop job that its output should be written to HBase. I do not really needs reducer, the kind of row I would like to insert is determined in the Mapper.

How can I use TableOutputFormat to achieve this? From all the examples I have seen the assumption is that the reducer is the one creating the Put, and that TableMapper is just for reading from HBase table.

In my case the input is HDFS the output is Put to specific table, I cannot find anything in TableMapReduceUtil that can help me with that either.

Is there any example out there that can help me with that?

BTW, I am using the new Hadoop API

解决方案

This is the example of reading from file and put all lines into Hbase. This example is from "Hbase: The definitive guide" and you can find it on repository. To get it just clone repo on your computer:

git clone git://github.com/larsgeorge/hbase-book.git

In this book you can also find all the explanations about the code. But if something is incomprehensible for you, feel free to ask.

`    public class ImportFromFile {
     public static final String NAME = "ImportFromFile"; 
     public enum Counters { LINES }

     static class ImportMapper
     extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> { 
       private byte[] family = null;
       private byte[] qualifier = null;

       @Override
       protected void setup(Context context)
         throws IOException, InterruptedException {
         String column = context.getConfiguration().get("conf.column");
         byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
         family = colkey[0];
         if (colkey.length > 1) {
           qualifier = colkey[1];
         }
       }

       @Override
       public void map(LongWritable offset, Text line, Context context) 
       throws IOException {
          try {
           String lineString = line.toString();
           byte[] rowkey = DigestUtils.md5(lineString); 
           Put put = new Put(rowkey);
           put.add(family, qualifier, Bytes.toBytes(lineString)); 
           context.write(new ImmutableBytesWritable(rowkey), put);
           context.getCounter(Counters.LINES).increment(1);
         } catch (Exception e) {
           e.printStackTrace();
         }
       }
     }

     private static CommandLine parseArgs(String[] args) throws ParseException { 
       Options options = new Options();
       Option o = new Option("t", "table", true,
         "table to import into (must exist)");
       o.setArgName("table-name");
       o.setRequired(true);
       options.addOption(o);
       o = new Option("c", "column", true,
         "column to store row data into (must exist)");
       o.setArgName("family:qualifier");
       o.setRequired(true);
       options.addOption(o);
       o = new Option("i", "input", true,
         "the directory or file to read from");
       o.setArgName("path-in-HDFS");
       o.setRequired(true);
       options.addOption(o);
       options.addOption("d", "debug", false, "switch on DEBUG log level");
       CommandLineParser parser = new PosixParser();
       CommandLine cmd = null;
       try {
         cmd = parser.parse(options, args);
       } catch (Exception e) {
         System.err.println("ERROR: " + e.getMessage() + "\n");
         HelpFormatter formatter = new HelpFormatter();
         formatter.printHelp(NAME + " ", options, true);
         System.exit(-1);
       }
       return cmd;
     }

     public static void main(String[] args) throws Exception {
       Configuration conf = HBaseConfiguration.create();
       String[] otherArgs =
         new GenericOptionsParser(conf, args).getRemainingArgs(); 
       CommandLine cmd = parseArgs(otherArgs);
       String table = cmd.getOptionValue("t");
       String input = cmd.getOptionValue("i");
       String column = cmd.getOptionValue("c");
       conf.set("conf.column", column);
       Job job = new Job(conf, "Import from file " + input + " into table " + table); 

            job.setJarByClass(ImportFromFile.class);
       job.setMapperClass(ImportMapper.class);
       job.setOutputFormatClass(TableOutputFormat.class);
       job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
       job.setOutputKeyClass(ImmutableBytesWritable.class);
       job.setOutputValueClass(Writable.class);
       job.setNumReduceTasks(0); 
       FileInputFormat.addInputPath(job, new Path(input));
       System.exit(job.waitForCompletion(true) ? 0 : 1);
     }
    }`

这篇关于Hadoop - 直接从Mapper写入HBase的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆