如何在 flink 流作业中读写 HBase [英] How to read and write to HBase in flink streaming job

查看:88
本文介绍了如何在 flink 流作业中读写 HBase的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果我们必须在流式应用程序中读取和写入 HBASE,我们该怎么做.我们通过open方法打开一个连接进行写入,我们如何打开一个连接进行读取.

If we have to read and write to HBASE in a streaming application how could we do that. We open a connection via open method for write, how could we open a connection for read.

object test {

    if (args.length != 11) {
      //print args
      System.exit(1)
    }

    val Array() = args
    println("Parameters Passed " + ...);

    val env = StreamExecutionEnvironment.getExecutionEnvironment


    val properties = new Properties()
    properties.setProperty("bootstrap.servers", metadataBrokerList)
    properties.setProperty("zookeeper.connect", zkQuorum)
    properties.setProperty("group.id", group)


    val messageStream = env.addSource(new FlinkKafkaConsumer08[String](topics, new SimpleStringSchema(), properties))

    messageStream.map { x => getheader(x) }





    def getheader(a: String) {

        //Get header and parse and split the headers
                if (metadata not available hit HBASE) { //Device Level send(Just JSON)

            //How to read from HBASE here .

                      } 
                      //If the resultset is not available in Map fetch from phoenix
                      else {
                          //fetch from cache
                      }
     }




    }
   messageStream.writeUsingOutputFormat(new HBaseOutputFormat());
   env.execute()

}

现在在方法 getheader 中,如果我想从 if(metadata not available hit HBASE) 中的 HBASE 读取,我该怎么做.我不想在这里打开一个连接,这个想法是为一个线程维护一个连接并重用它,就像 flink 使用 open() 方法对 HBASE sink 所做的那样,或者 spark 如何使用 foreachpartition.我试过 this 但我无法将 StreamExecutionEnvironment 传递给方法.我怎么能做到这一点,有人可以提供一个片段吗?

Now inside the method getheader if i want to read from HBASE inside if(metadata not available hit HBASE) how could i do that. I don't want to open a connection here, the idea is to maintain a single connection for a thread and reuse that, like flink does with HBASE sink with open() method or how spark does with foreachpartition. I tried this but i cannot pass StreamExecutionEnvironment to methods. How could i achieve this,could someone provide a snippet?

推荐答案

您想从流式用户函数读取/写入 Apache HBase.HBaseReadExample 正在做一些不同的事情:它将 HBase 表读入 DataSet(Flink 的批处理抽象).在用户函数中使用此代码意味着从 Flink 程序中启动 Flink 程序.

You want to read from / write to Apache HBase from a streaming user-function. The HBaseReadExample that you linked is doing something different: it reads an HBase table into a DataSet (the batch processing abstraction of Flink). Using this code in a user-function would mean to start a Flink program from within a Flink program.

对于您的用例,您需要直接在您的用户函数中创建一个 HBase 客户端并与之交互.最好的方法是使用 RichFlatMapFunction 并在 open() 方法中创建到 HBase 的连接.

For your use case, you need to directly create an HBase client in your user function and interact with it. The best way to do this is to use a RichFlatMapFunction and create the connection to HBase in the open() method.

下一版本的 Flink (1.2.0) 将支持 用户函数中的异步 I/O 操作应该会显着提高应用程序的吞吐量.

The next version of Flink (1.2.0) will feature support for asynchronous I/O operations in user functions which should improve the throughput of applications significantly.

这篇关于如何在 flink 流作业中读写 HBase的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆