以编程方式将数据批量加载到 HBase 的最快方法是什么? [英] What is the fastest way to bulk load data into HBase programmatically?

查看:25
本文介绍了以编程方式将数据批量加载到 HBase 的最快方法是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个可能有数百万行的纯文本文件,需要自定义解析,我想尽快将其加载到 HBase 表中(使用 Hadoop 或 HBase Java 客户端).

I have a Plain text file with possibly millions of lines which needs custom parsing and I want to load it into an HBase table as fast as possible (using Hadoop or HBase Java client).

我当前的解决方案是基于 MapReduce 作业,没有减少部分.我使用 FileInputFormat 读取文本文件,以便将每一行传递给我的 Mapper 类的 map 方法.此时,该行被解析为一个 Put 对象,该对象被写入 context.然后,TableOutputFormatPut 对象插入到表格中.

My current solution is based on a MapReduce job without the Reduce part. I use FileInputFormat to read the text file so that each line is passed to the map method of my Mapper class. At this point the line is parsed to form a Put object which is written to the context. Then, TableOutputFormat takes the Put object and inserts it to table.

该解决方案产生的平均插入率为每秒 1,000 行,低于我的预期.我的 HBase 设置在单个服务器上处于伪分布式模式.

This solution yields an average insertion rate of 1,000 rows per second, which is less than what I expected. My HBase setup is in pseudo distributed mode on a single server.

一件有趣的事情是,在插入 1,000,000 行的过程中,产生了 25 个 Mapper(任务),但它们依次运行(一个接一个);这正常吗?

One interesting thing is that during insertion of 1,000,000 rows, 25 Mappers (tasks) are spawned but they run serially (one after another); is this normal?

这是我当前解决方案的代码:

Here is the code for my current solution:

public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

    protected void map(LongWritable key, Text value, Context context) throws IOException {
        Map<String, String> parsedLine = parseLine(value.toString());

        Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1])));
        for (String currentKey : parsedLine.keySet()) {
            row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey)));
        }

        try {
            context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

public int run(String[] args) throws Exception {
    if (args.length != 2) {
        return -1;
    }

    conf.set("hbase.mapred.outputtable", args[1]);

    // I got these conf parameters from a presentation about Bulk Load
    conf.set("hbase.hstore.blockingStoreFiles", "25");
    conf.set("hbase.hregion.memstore.block.multiplier", "8");
    conf.set("hbase.regionserver.handler.count", "30");
    conf.set("hbase.regions.percheckin", "30");
    conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3");
    conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15");

    Job job = new Job(conf);
    job.setJarByClass(BulkLoadMapReduce.class);
    job.setJobName(NAME);
    TextInputFormat.setInputPaths(job, new Path(args[0]));
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(CustomMap.class);
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(Put.class);
    job.setNumReduceTasks(0);
    job.setOutputFormatClass(TableOutputFormat.class);

    job.waitForCompletion(true);
    return 0;
}

public static void main(String[] args) throws Exception {
    Long startTime = Calendar.getInstance().getTimeInMillis();
    System.out.println("Start time : " + startTime);

    int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args);

    Long endTime = Calendar.getInstance().getTimeInMillis();
    System.out.println("End time : " + endTime);
    System.out.println("Duration milliseconds: " + (endTime-startTime));

    System.exit(errCode);
}

推荐答案

我经历了一个可能与您尝试找到一种将数据从 MR 加载到 HBase 的有效方法非常相似的过程.我发现有效的是使用 HFileOutputFormat 作为 MR 的 OutputFormatClass.

I've gone through a process that is probably very similar to yours of attempting to find an efficient way to load data from an MR into HBase. What I found to work is using HFileOutputFormat as the OutputFormatClass of the MR.

下面是我的代码基础,我必须生成job 和写出数据的Mapper map 函数.这很快.我们不再使用它了,所以我手头上没有数字,但在一分钟内大约有 250 万条记录.

Below is the basis of my code that I have to generate the job and the Mapper map function which writes out the data. This was fast. We don't use it anymore, so I don't have numbers on hand, but it was around 2.5 million records in under a minute.

这是我编写的(精简的)函数,用于为我的 MapReduce 进程生成作业以将数据放入 HBase

Here is the (stripped down) function I wrote to generate the job for my MapReduce process to put data into HBase

private Job createCubeJob(...) {
    //Build and Configure Job
    Job job = new Job(conf);
    job.setJobName(jobName);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper
    job.setJarByClass(CubeBuilderDriver.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(HFileOutputFormat.class);

    TextInputFormat.setInputPaths(job, hiveOutputDir);
    HFileOutputFormat.setOutputPath(job, cubeOutputPath);

    Configuration hConf = HBaseConfiguration.create(conf);
    hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
    hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort);

    HTable hTable = new HTable(hConf, tableName);

    HFileOutputFormat.configureIncrementalLoad(job, hTable);
    return job;
}

这是我在 HiveToHBaseMapper 类中的地图函数(稍作修改).

This is my map function from the HiveToHBaseMapper class (slightly edited ).

public void map(WritableComparable key, Writable val, Context context)
        throws IOException, InterruptedException {
    try{
        Configuration config = context.getConfiguration();
        String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR);
        String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY);
        String column = strs[COLUMN_INDEX];
        String Value = strs[VALUE_INDEX];
        String sKey = generateKey(strs, config);
        byte[] bKey = Bytes.toBytes(sKey);
        Put put = new Put(bKey);
        put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
                        ? Bytes.toBytes(Double.MIN_VALUE)
                        : Bytes.toBytes(value));

        ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
        context.write(ibKey, put);

        context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1);
    }
    catch(Exception e){
        context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1);    
    }

}

我很确定这不会是您的复制和粘贴解决方案.显然,我在这里处理的数据不需要任何自定义处理(这是在此之前的 MR 作业中完成的).我想提供的主要内容是 HFileOutputFormat.其余的只是我如何使用它的一个例子.:)
我希望它能让您走上一条通往良好解决方案的坚实道路.:

I pretty sure this isn't going to be a Copy&Paste solution for you. Obviously the data I was working with here didn't need any custom processing (that was done in a MR job before this one). The main thing I want to provide out of this is the HFileOutputFormat. The rest is just an example of how I used it. :)
I hope it gets you onto a solid path to a good solution. :

这篇关于以编程方式将数据批量加载到 HBase 的最快方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆