如何实施hbase安全批量加载 [英] how to implement the hbase secure bulk load

查看:108
本文介绍了如何实施hbase安全批量加载的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经在kerberos集群的hbase中创建了一个批量加载,驱动类与此类似(工作):

  public static void main(String [] args){
try {
int response = ToolRunner.run(HBaseConfiguration.create(),new HBaseBulkLoadDriver(),args);
if(response == 0){
System.out.println(Job is successfully completed ...);
} else {
System.out.println(Job failed ...);
}
} catch(异常异常){
exception.printStackTrace();



@Override
public int run(String [] args)throws Exception {
int result = 0;

final String inputPath = args [0];
final String outputPath = args [1];
final String keytab = args [2];

配置配置= getConf();


configuration.set(data.seperator,DATA_SEPERATOR);
configuration.set(hbase.table.name,TABLE_NAME);
// configuration.set(INTRO,COLUMN_FAMILY_INTRO);
configuration.set(hbase.zookeeper.quorum,zk_quorum);
configuration.set(hbase.zookeeper.property.clientPort,2181);
configuration.set(hbase.master,master:port);
configuration.set(hadoop.security.authentication,Kerberos);
configuration.set(hbase.security.authentication,kerberos);

//configuration.set(\"COLUMN_FAMILY_2\",COLUMN_FAMILY_2);
工作职位=新职位(配置);
//作业配置
job.setJarByClass(HBaseBulkLoadDriver.class);
job.setJobName(Bulk Loading HBase Table:+ TABLE_NAME);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
// mapper class
job.setMapperClass(HBaseBulkLoadMapper.class);
FileInputFormat.addInputPaths(job,inputPath);
FileSystem.getLocal(getConf())。delete(new Path(outputPath),true);
FileOutputFormat.setOutputPath(job,new Path(outputPath));
job.setMapOutputValueClass(Put.class);
HFileOutputFormat.configureIncrementalLoad(job,new HTable(configuration,TABLE_NAME));

job.waitForCompletion(true);

System.out.println(写入文件夹的输出:+ outputPath);

System.out.println(要继续加载文件user:hbase:hbase必须拥有该文件夹!);

System.out.println(hbase用户是否存在文件夹?按Y加载数据,按N并且作业将失败);

String IsHbaseOwnerOftheFolder = System.console()。readLine();

if(job.isSuccessful()&&&& IsHbaseOwnerOftheFolder.equals(Y)){
HBaseBulkLoad.doBulkLoad(outputPath,keytab,TABLE_NAME);
} else {
result = -1;
}
返回结果;
}

现在我想实施安全批量加载,但似乎必须使用协处理器框架(hbase 1.0.0)实现有人可以给我一个关于如何使用securebulkloadHFiles方法的完整示例吗?
感谢您的帮助

解决方案

我会回答自己的问题:

对于这个答案,hbase中的表格必须已经存在,而且必须已经为导入生成HFile

  import java.util.ArrayList; 
import java.util.List;
导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.fs.Path;
导入org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.UserGroupInformation;

String keyTab =pathtokeytabfile;
String tableName =tb_name;
String pathToHFile =/ tmp / tmpfiles /;
配置配置=新配置();

configuration.set(hbase.zookeeper.quorum,ZK_QUORUM);
configuration.set(hbase.zookeeper+.property.clientPort,2181);
configuration.set(hbase.master,MASTER:60000);
configuration.set(hadoop.security.authentication,Kerberos);
configuration.set(hbase.security.authentication,kerberos);


//获取Kerberos认证

UserGroupInformation.setConfiguration(configuration);

UserGroupInformation.loginUserFromKeytab(keytab,key选项卡的路径);

HBaseAdmin.checkHBaseAvailable(configuration);

System.out.println(HBase is running!);

HBaseConfiguration.addHbaseResources(configuration);

连接conn = ConnectionFactory.createConnection(configuration);

Table table = conn.getTable(TableName.valueOf(tableName));

HRegionInfo tbInfo = new HRegionInfo(table.getName());


//需要加载的HFiles的路径

路径hfofDir = new Path(pathToHFile);

//获取认证用户令牌

UserProvider up = UserProvider.instantiate(configuration);

FsDelegationToken fsDelegationToken = new FsDelegationToken(向上,关键标签用户的名称);

fsDelegationToken.acquireDelegationToken(hfofDir.getFileSystem(configuration));

//准备批量加载

SecureBulkLoadClient secureBulkLoadClient = new SecureBulkLoadClient(table);

String bulkToken = secureBulkLoadClient.prepareBulkLoad(table.getName());

System.out.println(bulkToken);

//创建家庭列表(家族姓名列表和与该姓氏对应的hfile的路径)

final List< Pair< byte [],String>>> ; famPaths = new ArrayList<>();

Pair p = new Pair();

//家庭名称
p.setFirst(nameofthefamily.getBytes());

// HFile的路径(HFile在家族名称的文件夹中组织)
p.setSecond(/ tmp / tmpfiles / INTRO / nameofthefilehere);

famPaths.add(p);

//使用安全批量加载客户端批量加载

secureBulkLoadClient.bulkLoadHFiles(famPaths,fsDelegationToken.getUserToken(),bulkToken,tbInfo.getStartKey());

System.out.println(Bulk Load Completed ..);


I already created a bulk load in hbase in a kerberos cluster with a driver class similar to this (working):

public static void main(String[] args) {        
    try {
        int response = ToolRunner.run(HBaseConfiguration.create(), new HBaseBulkLoadDriver(), args);            
        if(response == 0) {             
            System.out.println("Job is successfully completed...");
        } else {
            System.out.println("Job failed...");
        }
    } catch(Exception exception) {
        exception.printStackTrace();
    }
}

@Override
public int run(String[] args) throws Exception {
    int result=0;

    final String inputPath = args[0];   
    final String outputPath = args[1];      
    final String keytab = args[2];  

    Configuration configuration = getConf();        


    configuration.set("data.seperator", DATA_SEPERATOR);        
    configuration.set("hbase.table.name",TABLE_NAME);
   // configuration.set("INTRO",COLUMN_FAMILY_INTRO);
    configuration.set("hbase.zookeeper.quorum","zk_quorum");
    configuration.set("hbase.zookeeper.property.clientPort","2181");
    configuration.set("hbase.master","master:port");
    configuration.set("hadoop.security.authentication", "Kerberos");
    configuration.set("hbase.security.authentication", "kerberos");

        //configuration.set("COLUMN_FAMILY_2",COLUMN_FAMILY_2);     
    Job job = new Job(configuration);       
    // job configuration
    job.setJarByClass(HBaseBulkLoadDriver.class);       
    job.setJobName("Bulk Loading HBase Table:"+TABLE_NAME);     
    job.setInputFormatClass(TextInputFormat.class);     
    job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
    //mapper class
    job.setMapperClass(HBaseBulkLoadMapper.class);      
    FileInputFormat.addInputPaths(job,inputPath);   
    FileSystem.getLocal(getConf()).delete(new Path(outputPath), true);      
    FileOutputFormat.setOutputPath(job, new Path(outputPath));      
    job.setMapOutputValueClass(Put.class);      
    HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME));  

    job.waitForCompletion(true);         

    System.out.println("Output written to folder :" + outputPath);

    System.out.println("To proceed loading files user: hbase:hbase must own recursivly the folder!");

    System.out.println("Is hbase user owing the folder?press Y to load the data , press N and job will fail");

    String IsHbaseOwnerOftheFolder = System.console().readLine();

    if (job.isSuccessful() && IsHbaseOwnerOftheFolder.equals("Y")) {
        HBaseBulkLoad.doBulkLoad(outputPath, keytab, TABLE_NAME);
    } else {
        result = -1;
    }
    return result;
}

Now I would like to implement the secure bulk load but seem that this must be implemented using coprocessor framework (hbase 1.0.0) can someone give me a complete example of how to use the securebulkloadHFiles method ? Thanks for the help

解决方案

I will answer myself to my question :

For this answer to work a table in hbase must already exist moreover HFile have to be already generated for the import

import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.UserGroupInformation;

String keyTab = "pathtokeytabfile";
String tableName = "tb_name";
String pathToHFile = "/tmp/tmpfiles/";
Configuration configuration = new Configuration();  

configuration.set("hbase.zookeeper.quorum","ZK_QUORUM");
configuration.set("hbase.zookeeper"+ ".property.clientPort","2181");
configuration.set("hbase.master","MASTER:60000");
configuration.set("hadoop.security.authentication", "Kerberos");
configuration.set("hbase.security.authentication", "kerberos");


//Obtaining kerberos authentication 

UserGroupInformation.setConfiguration(configuration);

UserGroupInformation.loginUserFromKeytab("here keytab", path to the key tab);

HBaseAdmin.checkHBaseAvailable(configuration);

System.out.println("HBase is running!");

HBaseConfiguration.addHbaseResources(configuration);    

Connection conn = ConnectionFactory.createConnection(configuration);

Table table = conn.getTable(TableName.valueOf(tableName));

HRegionInfo tbInfo = new HRegionInfo(table.getName());


//path to the HFiles that need to be loaded 

Path hfofDir = new Path(pathToHFile);

//acquiring user token for authentication 

UserProvider up = UserProvider.instantiate(configuration);

FsDelegationToken fsDelegationToken = new FsDelegationToken(up, "name of the key tab user");

fsDelegationToken.acquireDelegationToken(hfofDir.getFileSystem(configuration));

//preparing  for the bulk load

SecureBulkLoadClient secureBulkLoadClient = new SecureBulkLoadClient(table);

String bulkToken = secureBulkLoadClient.prepareBulkLoad(table.getName());

System.out.println(bulkToken);

//creating the family list (list of family names and path to the hfile corresponding to the family name)

final List<Pair<byte[], String>> famPaths = new ArrayList<>();

Pair p = new Pair();

//name of the family 
p.setFirst("nameofthefamily".getBytes());

//path to the HFile (HFile are organized in folder with the name of the family)
p.setSecond("/tmp/tmpfiles/INTRO/nameofthefilehere");

famPaths.add(p);

//bulk loading ,using the secure bulk load client

secureBulkLoadClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(), bulkToken, tbInfo.getStartKey());

System.out.println("Bulk Load Completed..");    

这篇关于如何实施hbase安全批量加载的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆