以编程方式批量加载数据到HBase的最快方式是什么? [英] What is the fastest way to bulk load data into HBase programmatically?
问题描述
我有一个纯文本文件,可能有数百万行需要自定义分析,我想尽可能快地将它加载到HBase表中(使用Hadoop或HBase Java客户端)。
我目前的解决方案基于没有Reduce部分的 MapReduce 作业。我使用 FileInputFormat
来读取文本文件,以便每行都传递给我的<$ c $的 map
方法c> Mapper 类。在这一点上,该行被解析为一个写入<$ c $ c>上下文的 Put
对象。然后, TableOutputFormat
接受 Put
对象并将其插入表中。
该解决方案的平均插入速率为每秒1,000行,这比我预期的要少。 我的HBase设置在单个服务器上以伪分布模式运行。
有趣的是,在插入1,000,000行时,25个Mappers )被产生,但它们连续运行(一个接一个);这是正常的吗?
以下是我当前解决方案的代码:
public static class CustomMap extends Mapper< LongWritable,Text,ImmutableBytesWritable,Put> {
protected void map(LongWritable key,Text value,Context context)throws IOException {
Map< String,String> parsedLine = parseLine(value.toString());
Put row = new Put(Bytes.toBytes(parsedLine.get(keys [1])));
for(String currentKey:parsedLine.keySet()){
row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey))) ;
尝试{
context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys [1]))),row);
} catch(InterruptedException e){
// TODO自动生成的catch块
e.printStackTrace();
$ b public int run(String [] args)throws Exception {
if(args.length!= 2){
返回-1;
}
conf.set(hbase.mapred.outputtable,args [1]);
//我从关于批量加载
conf.set(hbase.hstore.blockingStoreFiles,25)的演示中获得了这些conf参数;
conf.set(hbase.hregion.memstore.block.multiplier,8);
conf.set(hbase.regionserver.handler.count,30);
conf.set(hbase.regions.percheckin,30);
conf.set(hbase.regionserver.globalMemcache.upperLimit,0.3);
conf.set(hbase.regionserver.globalMemcache.lowerLimit,0.15);
工作职位=新职位(conf);
job.setJarByClass(BulkLoadMapReduce.class);
job.setJobName(NAME);
TextInputFormat.setInputPaths(job,new Path(args [0]));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(CustomMap.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TableOutputFormat.class);
job.waitForCompletion(true);
返回0;
}
public static void main(String [] args)throws Exception {
Long startTime = Calendar.getInstance()。getTimeInMillis();
System.out.println(Start time:+ startTime);
int errCode = ToolRunner.run(HBaseConfiguration.create(),new BulkLoadMapReduce(),args);
Long endTime = Calendar.getInstance()。getTimeInMillis();
System.out.println(End time:+ endTime);
System.out.println(Duration milliseconds:+(endTime-startTime));
System.exit(errCode);
这个过程可能与您试图找到一种将数据从MR加载到HBase的有效方法非常相似。我发现的工作是使用 HFileOutputFormat
作为MR的OutputFormatClass。
以下是我的代码的基础,我必须生成作业
和Mapper map
写出数据的函数。这很快。我们不再使用它,所以我没有手头的数字,但它在一分钟内大概有250万条记录。
这里是(剥离下载)函数我写了为我的MapReduce进程生成作业,以便将数据放入HBase中
private作业createCubeJob(...) {
//构建和配置作业
作业作业=新作业(conf);
job.setJobName(jobName);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setMapperClass(HiveToHBaseMapper.class); //自定义映射器
job.setJarByClass(CubeBuilderDriver.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat.class);
TextInputFormat.setInputPaths(job,hiveOutputDir);
HFileOutputFormat.setOutputPath(job,cubeOutputPath);
配置hConf = HBaseConfiguration.create(conf);
hConf.set(hbase.zookeeper.quorum,hbaseZookeeperQuorum);
hConf.set(hbase.zookeeper.property.clientPort,hbaseZookeeperClientPort);
HTable hTable = new HTable(hConf,tableName);
HFileOutputFormat.configureIncrementalLoad(job,hTable);
返回工作;
$ b这是我从 HiveToHBaseMapper 的映射函数,
pre $ public $ map public void map(WritableComparable key,可写val,上下文上下文)
抛出IOException,InterruptedException {
尝试{
Configuration config = context.getConfiguration();
String [] strs = val.toString()。split(Constants.HIVE_RECORD_COLUMN_SEPARATOR);
String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY);
String column = strs [COLUMN_INDEX];
字符串值= strs [VALUE_INDEX];
String sKey = generateKey(strs,config);
byte [] bKey = Bytes.toBytes(sKey);
Put put = new Put(bKey);
put.add(Bytes.toBytes(family),Bytes.toBytes(column),(value< = 0)
?Bytes.toBytes(Double.MIN_VALUE)
:Bytes.toBytes (值));
ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
context.write(ibKey,put);
context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1);
catch(Exception e){
context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1);
}
}
我很确定这不是将成为复制和粘贴解决方案。很明显,我在这里使用的数据不需要任何自定义处理(这是在MR工作之前完成的)。我想提供的主要内容是 HFileOutputFormat
。其余的只是我如何使用它的一个例子。 :)
我希望它能让你走上一条坚实的道路,找到一个好的解决方案。 :
I have a Plain text file with possibly millions of lines which needs custom parsing and I want to load it into an HBase table as fast as possible (using Hadoop or HBase Java client).
My current solution is based on a MapReduce job without the Reduce part. I use FileInputFormat
to read the text file so that each line is passed to the map
method of my Mapper
class. At this point the line is parsed to form a Put
object which is written to the context
. Then, TableOutputFormat
takes the Put
object and inserts it to table.
This solution yields an average insertion rate of 1,000 rows per second, which is less than what I expected. My HBase setup is in pseudo distributed mode on a single server.
One interesting thing is that during insertion of 1,000,000 rows, 25 Mappers (tasks) are spawned but they run serially (one after another); is this normal?
Here is the code for my current solution:
public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
protected void map(LongWritable key, Text value, Context context) throws IOException {
Map<String, String> parsedLine = parseLine(value.toString());
Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1])));
for (String currentKey : parsedLine.keySet()) {
row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey)));
}
try {
context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
return -1;
}
conf.set("hbase.mapred.outputtable", args[1]);
// I got these conf parameters from a presentation about Bulk Load
conf.set("hbase.hstore.blockingStoreFiles", "25");
conf.set("hbase.hregion.memstore.block.multiplier", "8");
conf.set("hbase.regionserver.handler.count", "30");
conf.set("hbase.regions.percheckin", "30");
conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3");
conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15");
Job job = new Job(conf);
job.setJarByClass(BulkLoadMapReduce.class);
job.setJobName(NAME);
TextInputFormat.setInputPaths(job, new Path(args[0]));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(CustomMap.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TableOutputFormat.class);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
Long startTime = Calendar.getInstance().getTimeInMillis();
System.out.println("Start time : " + startTime);
int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args);
Long endTime = Calendar.getInstance().getTimeInMillis();
System.out.println("End time : " + endTime);
System.out.println("Duration milliseconds: " + (endTime-startTime));
System.exit(errCode);
}
I've gone through a process that is probably very similar to yours of attempting to find an efficient way to load data from an MR into HBase. What I found to work is using HFileOutputFormat
as the OutputFormatClass of the MR.
Below is the basis of my code that I have to generate the job
and the Mapper map
function which writes out the data. This was fast. We don't use it anymore, so I don't have numbers on hand, but it was around 2.5 million records in under a minute.
Here is the (stripped down) function I wrote to generate the job for my MapReduce process to put data into HBase
private Job createCubeJob(...) {
//Build and Configure Job
Job job = new Job(conf);
job.setJobName(jobName);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper
job.setJarByClass(CubeBuilderDriver.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat.class);
TextInputFormat.setInputPaths(job, hiveOutputDir);
HFileOutputFormat.setOutputPath(job, cubeOutputPath);
Configuration hConf = HBaseConfiguration.create(conf);
hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort);
HTable hTable = new HTable(hConf, tableName);
HFileOutputFormat.configureIncrementalLoad(job, hTable);
return job;
}
This is my map function from the HiveToHBaseMapper
class (slightly edited ).
public void map(WritableComparable key, Writable val, Context context)
throws IOException, InterruptedException {
try{
Configuration config = context.getConfiguration();
String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR);
String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY);
String column = strs[COLUMN_INDEX];
String Value = strs[VALUE_INDEX];
String sKey = generateKey(strs, config);
byte[] bKey = Bytes.toBytes(sKey);
Put put = new Put(bKey);
put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0)
? Bytes.toBytes(Double.MIN_VALUE)
: Bytes.toBytes(value));
ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
context.write(ibKey, put);
context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1);
}
catch(Exception e){
context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1);
}
}
I pretty sure this isn't going to be a Copy&Paste solution for you. Obviously the data I was working with here didn't need any custom processing (that was done in a MR job before this one). The main thing I want to provide out of this is the HFileOutputFormat
. The rest is just an example of how I used it. :)
I hope it gets you onto a solid path to a good solution. :
这篇关于以编程方式批量加载数据到HBase的最快方式是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!