仅HBase映射行删除 [英] HBase Map-only Row Delete

查看:153
本文介绍了仅HBase映射行删除的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

第一次写一个HBase mapreduce,我在删除HBase中的行时遇到了问题(试图将它作为一个纯地图作业运行)。作业成功并且能够扫描HBase表,并且我能够从HBase中读取映射器中的正确rowkeys(通过sysout进行验证)。但是,对于 Delete del = new Delete(row.get())的调用似乎并没有实际做任何事情。



下面是我试图运行的代码:

HBaseDelete.java



<$ p $ ($ {
public static void main(String [] args)throws Exception {

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,log_table);
job.setJarByClass(HBaseDeleteMapper.class);

扫描扫描=新扫描();
scan.setCaching(500);
scan.setCacheBlocks(false);

TableMapReduceUtil.initTableMapperJob(log_table,scan,HBaseDeleteMapper.class,null,null,job);

job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);

布尔型b = job.waitForCompletion(true);
if(!b){
抛出新的IOException(有错误的工作!);
}

}
}

HBaseDeleteMapper。 java

  public class HBaseDeleteMapper extends TableMapper< ImmutableBytesWritable,Delete> {
@Override
public void map ImmutableBytesWritable row,Result value,Context context)throws IOException,InterruptedException {
Delete delete = new Delete(row.get());
context.write(row,delete);


是否有错误提交删除?

解决方案

您正在写入上下文,而不是写入表格,您的映射器应该看起来有点类似于这个: p>

  public class HBaseDeleteMapper extends TableMapper< ImmutableBytesWritable,NullWritable> {

private HTable myTable;
$ b $保护无效设置(上下文上下文)抛出IOException,InterruptedException {
/ *可删除的HTable实例* /
myTable = new HTable(HBaseConfiguration.create(),myTable .getBytes());

$ b $ public void map(ImmutableBytesWritable row,Result value,Context context)throws IOException,InterruptedException {
myTable.delete(new Delete(row.get())); / *删除表中的行* /
//context.write (row,NullWritable.get()); / *如果你需要某些东西(如果你不需要它),只是一个带有删除行的输出* /
}

保护无效清理(上下文上下文)抛出IOException,InterruptedException {
myTable.close(); / *关闭表* /
}

}

请请注意,删除操作不使用写入缓冲区,此代码将在每次删除时发出1个RPC操作,这对此类作业不利。为了解决这个问题,您可以建立自己的列表<删除> 以进行批处理:

  public class HBaseDeleteMapper extends TableMapper< NullWritable,NullWritable> {

private HTable myTable;
私人清单<删除> deleteList = new ArrayList< Delete>();
final private int buffer = 10000; / *缓冲区大小,根据需要调整* /

保护无效设置(上下文上下文)抛出IOException,InterruptedException {
/ *可删除的HTable实例* /
myTable = new HTable(HBaseConfiguration.create(),myTable.getBytes());

$ b $ public void map(ImmutableBytesWritable row,Result value,Context context)throws IOException,InterruptedException {
deleteList.add(new Delete(row.get())); / *将删除添加到批处理* /
if(deleteList.size()== buffer){
myTable.delete(deleteList); / *提交批* /
deleteList.clear(); / *清除批* /
}
}

保护无效清理(上下文上下文)抛出IOException,InterruptedException {
if(deleteList.size()> 0 ){
myTable.delete(deleteList); / *提交剩余的批次* /
}
myTable.close(); / *关闭表* /
}

}


First time writing a HBase mapreduce and I'm having trouble deleting rows in HBase (trying to run it as a map-only job). The job succeeds and is able to scan the HBase table and I'm able to get the correct rowkeys in the mapper read from HBase (verified through sysout). However, it seems like the call to Delete del = new Delete(row.get()) isn't actually doing anything.

Below is the code I'm trying to run:

HBaseDelete.java

public class HBaseDelete { 
  public static void main(String[] args) throws Exception {

    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config, "log_table");
    job.setJarByClass(HBaseDeleteMapper.class);     

    Scan scan = new Scan();
    scan.setCaching(500);        
    scan.setCacheBlocks(false);

    TableMapReduceUtil.initTableMapperJob("log_table", scan, HBaseDeleteMapper.class, null, null, job);

    job.setOutputFormatClass(NullOutputFormat.class);
    job.setNumReduceTasks(0);

    boolean b = job.waitForCompletion(true);
    if (!b) {
        throw new IOException("error with job!");
    }

  }
}

HBaseDeleteMapper.java

public class HBaseDeleteMapper extends TableMapper<ImmutableBytesWritable, Delete>{
  @Override
  public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    Delete delete = new Delete(row.get());
    context.write(row, delete);
  }
}

Is there something missing to 'commit' the deletion?

解决方案

You're writing to the context, not to the table, your mapper should look somewhat similar to this one:

public class HBaseDeleteMapper extends TableMapper<ImmutableBytesWritable, NullWritable>{

    private HTable myTable;

    protected void setup(Context context) throws IOException, InterruptedException {
        /* HTable instance for deletes */
        myTable = new HTable(HBaseConfiguration.create(), "myTable".getBytes());
    }

    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
        myTable.delete(new Delete(row.get())); /* Delete the row from the table */
        //context.write(row, NullWritable.get()); /* Just an output with deleted rows if you need it for something (avoid it if you not) */
    }

    protected void cleanup(Context context) throws IOException, InterruptedException { 
        myTable.close(); /* Close table */
    }

}

Please notice that delete operations don't use the write buffer, this code will issue 1 RPC operation per delete, which is not good for this type of jobs. To address that you can build your own List<Delete> to batch them:

public class HBaseDeleteMapper extends TableMapper<NullWritable, NullWritable>{

    private HTable myTable;
    private List<Delete> deleteList = new ArrayList<Delete>();
    final private int buffer = 10000; /* Buffer size, tune it as desired */

    protected void setup(Context context) throws IOException, InterruptedException {
        /* HTable instance for deletes */
        myTable = new HTable(HBaseConfiguration.create(), "myTable".getBytes());
    }

    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
        deleteList.add(new Delete(row.get())); /* Add delete to the batch */
        if (deleteList.size()==buffer) {
            myTable.delete(deleteList); /* Submit batch */
            deleteList.clear(); /* Clear batch */
        }
    }

    protected void cleanup(Context context) throws IOException, InterruptedException {
        if (deleteList.size()>0) {
            myTable.delete(deleteList); /* Submit remaining batch */
        }
        myTable.close(); /* Close table */
    }

}

这篇关于仅HBase映射行删除的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆