具有Hbase集成的Spark结构化流 [英] Spark Structured Streaming with Hbase integration

查看:91
本文介绍了具有Hbase集成的Spark结构化流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在对从MySQL收集的kafka数据进行流式处理.现在,所有分析完成后,我想将数据直接保存到Hbase.我已经通过了Spark结构化的流式文档,但是使用Hbase找不到任何接收器.下面是我用来从Kafka读取数据的代码.

We are doing streaming on kafka data which being collected from MySQL. Now once all the analytics has been done i want to save my data directly to Hbase. I have through the spark structured streaming document but couldn't find any sink with Hbase. Code which I used to read the data from Kafka is below.

 val records = spark.readStream.format("kafka").option("subscribe", "kaapociot").option("kafka.bootstrap.servers", "XX.XX.XX.XX:6667").option("startingOffsets", "earliest").load
 val jsonschema = StructType(Seq(StructField("header", StringType, true),StructField("event", StringType, true)))
 val uschema = StructType(Seq(
             StructField("MeterNumber", StringType, true),
             StructField("Utility", StringType, true),
             StructField("VendorServiceNumber", StringType, true),
             StructField("VendorName", StringType, true),
             StructField("SiteNumber",  StringType, true),
             StructField("SiteName", StringType, true),
             StructField("Location", StringType, true),
             StructField("timestamp", LongType, true),
             StructField("power", DoubleType, true)
             ))
 val DF_Hbase = records.selectExpr("cast (value as string) as Json").select(from_json($"json",schema=jsonschema).as("data")).select("data.event").select(from_json($"event", uschema).as("mykafkadata")).select("mykafkadata.*")

现在,最后,我想将DF_Hbase数据帧保存在hbase中.

Now finally, I want to save DF_Hbase dataframe in hbase.

推荐答案

1-将这些库添加到您的项目中:

1- add these libraries to your project :

      "org.apache.hbase" % "hbase-client" % "2.0.1"
      "org.apache.hbase" % "hbase-common" % "2.0.1"

2-将此特征添加到您的代码中:

2- add this trait to your code :

   import java.util.concurrent.ExecutorService
   import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
   import org.apache.hadoop.hbase.security.User
   import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
   import org.apache.spark.sql.ForeachWriter

   trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {

     val tableName: String
     val hbaseConfResources: Seq[String]

     def pool: Option[ExecutorService] = None

     def user: Option[User] = None

     private var hTable: Table = _
     private var connection: Connection = _


     override def open(partitionId: Long, version: Long): Boolean = {
       connection = createConnection()
       hTable = getHTable(connection)
       true
     }

     def createConnection(): Connection = {
       val hbaseConfig = HBaseConfiguration.create()
       hbaseConfResources.foreach(hbaseConfig.addResource)
       ConnectionFactory.createConnection(hbaseConfig, pool.orNull,                      user.orNull)

     }

     def getHTable(connection: Connection): Table = {
       connection.getTable(TableName.valueOf(tableName))
     }

     override def process(record: RECORD): Unit = {
       val put = toPut(record)
       hTable.put(put)
     }

     override def close(errorOrNull: Throwable): Unit = {
       hTable.close()
       connection.close()
     }

     def toPut(record: RECORD): Put

   }

3-将其用于您的逻辑:

3- use it for your logic :

    val ds = .... //anyDataset[WhatEverYourDataType]

    val query = ds.writeStream
           .foreach(new HBaseForeachWriter[WhatEverYourDataType] {
                            override val tableName: String = "hbase-table-name"
                            //your cluster files, i assume here it is in resources  
                            override val hbaseConfResources: Seq[String] = Seq("core-site.xml", "hbase-site.xml") 

                            override def toPut(record: WhatEverYourDataType): Put = {
                              val key = .....
                              val columnFamaliyName : String = ....
                              val columnName : String = ....
                              val columnValue = ....

                              val p = new Put(Bytes.toBytes(key))
                              //Add columns ... 
                   p.addColumn(Bytes.toBytes(columnFamaliyName),
                               Bytes.toBytes(columnName), 
                               Bytes.toBytes(columnValue))

                              p
                            }

                          }
           ).start()

         query.awaitTermination()

这篇关于具有Hbase集成的Spark结构化流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆