使用coprocesor HBase创建二级索引 [英] Create secondary index using coprocesor HBase

查看:107
本文介绍了使用coprocesor HBase创建二级索引的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在尝试编写我自己的协处理器,它使用prePut钩子创建一个二级索引。首先,我一直试图让prePut协处理器工作。到目前为止,我可以让协处理器添加到传递给它的放置对象。我发现的是,我无法让协处理器写入与传入的put对象写入的行分开的行。很显然,要创建一个二级索引,我需要弄清楚这一点。

I've been trying to write my own coprocessor that creates a secondary index using the prePut hook. To start, I've been simply trying to get a prePut coprocessor to work. So far I can have the coprocessor add to the put object passed to it. What i've found is that I cannot get the coprocessor to write to a row separate from what the passed in put object is writing to. Obviously to create a secondary index, I need to figure this one out.

以下是我的协处理器的代码,但它不起作用。

是的,所有表都存在,并且'colfam1'也存在。

HBase版本:来自Cloudera CDH4的HBase 0.92.1-cdh4.1.2

Below is the code for my coprocessor, but it doesn't work.
Yes, all tables exists, and 'colfam1' exists too.
HBase Version: HBase 0.92.1-cdh4.1.2 from Cloudera's CDH4

有人知道问题所在吗?

    @Override
        public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException {          
            KeyValue kv = new KeyValue(Bytes.toBytes("COPROCESSORROW"), Bytes.toBytes("colfam1"),Bytes.toBytes("COPROCESSOR: "+System.currentTimeMillis()),Bytes.toBytes("IT WORKED"));
            put.add(kv);
        }

我收到以下错误:

I get the following error:

    ERROR: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: IOException: 1 time, servers with issues:

更新:

我将协处理器修改为以下版本,米仍然出现错误。现在post-Put(二级索引)被写入,但仍然有超时错误。

该区域的整个表崩溃也要求我重新启动该区域。有时候区域重启不起作用,整个区域(所有表)
都会损坏,需要重建服务器。

I've modified my coprocessor to the following, but I'm still getting an error. Now the post-Put (secondary index) is written, but there is still a timeout error.
The entire table on the region crashes too requiring me to restart the region. Sometimes a region restart doesn't work and the entire region (all tables) are corrupted requiring a server rebuild.

我不知道为什么... !

I have no idea why...!?

@Override
      public void start(CoprocessorEnvironment env) throws IOException {        
        LOG.info("(start)");
        pool = new HTablePool(env.getConfiguration(), 10);
     }

    @Override
    public void postPut(final ObserverContext<RegionCoprocessorEnvironment> observerContext,final Put put,final WALEdit edit,final boolean writeToWAL) throws IOException {
        byte[] tableName  = observerContext.getEnvironment().getRegion().getRegionInfo().getTableName();

        //not necessary though if you register the coprocessor for the specific table , SOURCE_TBL
        if (!Bytes.equals(tableName, Bytes.toBytes(SOURCE_TABLE))) 
            return;         

        try {           
            LOG.info("STARTING postPut");
            HTableInterface table = pool.getTable(Bytes.toBytes(INDEX_TABLE));
            LOG.info("TURN OFF AUTOFLUSH");
            table.setAutoFlush(false);
            //create row              
            LOG.info("Creating new row");            
            byte [] rowkey = Bytes.toBytes("COPROCESSOR ROW");
            Put indexput  = new Put(rowkey); 
            indexput.add(Bytes.toBytes ( "data"),  Bytes.toBytes("CP: "+System.currentTimeMillis()),  Bytes.toBytes("IT WORKED!"));
            LOG.info("Writing to table");
            table.put(indexput);
            LOG.info("flushing commits");            
            table.flushCommits();
            LOG.info("close table");
            table.close();

        } catch ( IllegalArgumentException ex) {

            //handle excepion.
        }

      }


      @Override
      public void stop(CoprocessorEnvironment env) throws IOException {
        LOG.info("(stop)");
        pool.close();
      }

以下是区域服务器日志:(注意我的日志记录注释)

Here is the region server log: (note my logging comments)

2013-01-30 19:30:39,754 INFO my.package.MyCoprocessor: STARTING postPut
2013-01-30 19:30:39,754 INFO my.package.MyCoprocessor: TURN OFF AUTOFLUSH
2013-01-30 19:30:39,755 INFO my.package.MyCoprocessor: Creating new row
2013-01-30 19:30:39,755 INFO my.package.MyCoprocessor: Writing to table
2013-01-30 19:30:39,755 INFO my.package.MyCoprocessor: flushing commits
2013-01-30 19:31:39,813 WARN org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation: Failed all from region=test_table,,1359573731255.d41b77b31fafa6502a8f09db9c56b9d8., hostname=node01, port=60020
java.util.concurrent.ExecutionException: java.net.SocketTimeoutException: Call to node01/<private_ip>:60020 failed on socket timeout exception: java.net.SocketTimeoutException: 60000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/<private_ip>:56390 remote=node01/<private_ip>:60020]
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
    at java.util.concurrent.FutureTask.get(FutureTask.java:83)
    at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.processBatchCallback(HConnectionManager.java:1557)
    at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.processBatch(HConnectionManager.java:1409)
    at org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:949)
    at org.apache.hadoop.hbase.client.HTablePool$PooledHTable.flushCommits(HTablePool.java:449)
    at my.package.MyCoprocessor.postPut(MyCoprocessor.java:81)
    at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postPut(RegionCoprocessorHost.java:682)
    at org.apache.hadoop.hbase.regionserver.HRegion.doMiniBatchPut(HRegion.java:1901)
    at org.apache.hadoop.hbase.regionserver.HRegion.put(HRegion.java:1742)
    at org.apache.hadoop.hbase.regionserver.HRegionServer.multi(HRegionServer.java:3102)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:364)
    at org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1345)

解决方案:我正试图写信给我的coprocoessor中的同一张表,协处理器正在开发中:简而言之,当我编写一个单元格时,CP写了一个单元格,导致CP再次触发并写入另一个单元格等等。我停止了它通过执行行检查b4写入CP行以防止此循环。

Solved: I was trying to write to the same table in my coprocoessor that the coprocessor was working on: in short, when I wrote a cell, the CP wrote a cell causing the CP to trigger again and write another and so on and on. I stopped it by doing a row check b4 writting the CP row to prevent this loop.

推荐答案

以下代码片段我们使用Hbase中的协处理器来创建二级索引。

Below is snippet of code on how we use Coprocessors in Hbase to create secondary index. Can be helpful to you.

public class TestCoprocessor extends BaseRegionObserver{

    private HTablePool pool = null;

    private final static String  INDEX_TABLE  = "INDEX_TBL";
    private final static String  SOURCE_TABLE = "SOURCE_TBL";

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {  
        pool = new HTablePool(env.getConfiguration(), 10);
    }

    @Override
    public void postPut(
        final ObserverContext<RegionCoprocessorEnvironment> observerContext,
        final Put put,
        final WALEdit edit,
        final boolean writeToWAL)
        throws IOException {

        byte[] table = observerContext.getEnvironment(
            ).getRegion().getRegionInfo().getTableName();

        // Not necessary though if you register the coprocessor
        // for the specific table, SOURCE_TBL
        if (!Bytes.equals(table, Bytes.toBytes(SOURCE_TABLE))) {
            return; 
        }

        try {
            final List<KeyValue> filteredList = put.get(
                Bytes.toBytes ( "colfam1"), Bytes.toBytes(" qaul"));
            filteredList.get( 0 ); //get the column value

            // get the values 
            HTableInterface table = pool.getTable(Bytes.toBytes(INDEX_TABLE));

            // create row key             
            byte [] rowkey = mkRowKey () //make the row key
            Put indexput = new Put(rowkey); 
            indexput.add(
                Bytes.toBytes( "colfam1"),
                Bytes.toBytes(" qaul"),
                Bytes.toBytes(" value.."));

            table.put(indexput);
            table.close();

        } catch ( IllegalArgumentException ex) {
            // handle excepion.
        }

    }


    @Override
    public void stop(CoprocessorEnvironment env) throws IOException {
        pool.close();
    }

}

注册上面的协处理器SOURCE_BL,进入hbase shell并按照以下步骤操作

To register the above coprocessor on the SOURCE_BL, go to the hbase shell and follow the below steps


  1. 禁用'SOURCE_TBL'

  2. alter 'SOURCE_TBL',METHOD =>
    'table_att','coprocessor'=>'file:///path/to/coprocessor.jar | TestCoprocessor | 1001'
  3. 启用'SOURCE_TBL'

这篇关于使用coprocesor HBase创建二级索引的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆