如何在flink streaming作业中读写HBase [英] How to read and write to HBase in flink streaming job

查看:3271
本文介绍了如何在flink streaming作业中读写HBase的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果我们必须在流媒体应用程序中读写HBASE,我们可以如何做到这一点。我们通过开放的方法打开连接进行写入,我们如何打开连接进行读取。

  object test {

if(args.length!= 11){
/ / print args
System.exit(1)
}
$ b $ val Array()= args
println(Passed Passed+ ...);
$ b $ val env = StreamExecutionEnvironment.getExecutionEnvironment

$ b $ val属性= new属性()
properties.setProperty(bootstrap.servers,metadataBrokerList)
properties.setProperty(zookeeper.connect,zkQuorum)
properties.setProperty(group.id,group)


val messageStream = env.addSource(新的FlinkKafkaConsumer08 [String](主题,新的SimpleStringSchema(),属性))

messageStream.map {x => getheader(x)}





def getheader(a:String){

// Get header and解析和拆分头文件
if(元数据不可用,命中HBASE){//设备级别发送(只需JSON)

//如何从HBASE读取此处。


//如果结果集在从菲尼克斯地图获取中不可用
else {
//从缓存中获取
}
}



$ b}
messageStream.writeUsingOutputFormat(new HBaseOutputFormat());
env.execute()

}

方法 getheader 如果我想从HBASE中读取 if(元数据不可用命中HBASE)我怎么能这样做。我不想在这里打开一个连接,这个想法是维护一个线程的单一连接并重用,就像flink用open()方法使用HBASE接收器或者使用foreachpartition时的spark一样。我试过这个,但我无法将StreamExecutionEnvironment传递给方法。我怎么能做到这一点,有人可以提供一个片段?

解决方案

您想从流中读取/写入Apache HBase用户的功能。 HBaseReadExample 正在做一些不同的事情:它将HBase表读入DataSet(Flink的批处理抽象)。在用户函数中使用此代码意味着从Flink程序中启动一个Flink程序。



对于您的用例,您需要直接创建一个HBase客户端您的用户功能并与其交互。最好的方法是使用 RichFlatMapFunction 并在 open()方法中创建与HBase的连接。

Flink的下一个版本(1.2.0)将支持异步I / O操作,这将显着提高应用程序的吞吐量。


If we have to read and write to HBASE in a streaming application how could we do that. We open a connection via open method for write, how could we open a connection for read.

object test {

    if (args.length != 11) {
      //print args
      System.exit(1)
    }

    val Array() = args
    println("Parameters Passed " + ...);

    val env = StreamExecutionEnvironment.getExecutionEnvironment


    val properties = new Properties()
    properties.setProperty("bootstrap.servers", metadataBrokerList)
    properties.setProperty("zookeeper.connect", zkQuorum)
    properties.setProperty("group.id", group)


    val messageStream = env.addSource(new FlinkKafkaConsumer08[String](topics, new SimpleStringSchema(), properties))

    messageStream.map { x => getheader(x) }





    def getheader(a: String) {

        //Get header and parse and split the headers
                if (metadata not available hit HBASE) { //Device Level send(Just JSON)

            //How to read from HBASE here .

                      } 
                      //If the resultset is not available in Map fetch from phoenix
                      else {
                          //fetch from cache
                      }
     }




    }
   messageStream.writeUsingOutputFormat(new HBaseOutputFormat());
   env.execute()

}

Now inside the method getheader if i want to read from HBASE inside if(metadata not available hit HBASE) how could i do that. I don't want to open a connection here, the idea is to maintain a single connection for a thread and reuse that, like flink does with HBASE sink with open() method or how spark does with foreachpartition. I tried this but i cannot pass StreamExecutionEnvironment to methods. How could i achieve this,could someone provide a snippet?

解决方案

You want to read from / write to Apache HBase from a streaming user-function. The HBaseReadExample that you linked is doing something different: it reads an HBase table into a DataSet (the batch processing abstraction of Flink). Using this code in a user-function would mean to start a Flink program from within a Flink program.

For your use case, you need to directly create an HBase client in your user function and interact with it. The best way to do this is to use a RichFlatMapFunction and create the connection to HBase in the open() method.

The next version of Flink (1.2.0) will feature support for asynchronous I/O operations in user functions which should improve the throughput of applications significantly.

这篇关于如何在flink streaming作业中读写HBase的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆