将Spark数据框插入HBase [英] Insert Spark dataframe into hbase

查看:50
本文介绍了将Spark数据框插入HBase的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据框,我想将其插入到hbase中。我遵循此文档

I have a dataframe and I want to insert it into hbase. I follow this documenation .

这是我的数据框的样子:

This is how my dataframe look like:

 --------------------
|id | name | address |
|--------------------|
|23 |marry |france   |
|--------------------|
|87 |zied  |italie   |
 --------------------

I使用以下代码创建hbase表:

I create a hbase table using this code:

val tableName = "two"
val conf = HBaseConfiguration.create()
if(!admin.isTableAvailable(tableName)) {
          print("-----------------------------------------------------------------------------------------------------------")
          val tableDesc = new HTableDescriptor(tableName)
          tableDesc.addFamily(new HColumnDescriptor("z1".getBytes()))
          admin.createTable(tableDesc)
        }else{
          print("Table already exists!!--------------------------------------------------------------------------------------")
        }

现在如何将该数据帧插入hbase中?

And now how may I insert this dataframe into hbase ?

在另一个示例我成功使用此代码插入到hbase中:

In another example I succeed to insert into hbase using this code:

val myTable = new HTable(conf, tableName)
    for (i <- 0 to 1000) {
      var p = new Put(Bytes.toBytes(""+i))
      p.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(""+(i*5)))
      p.add("z1".getBytes(), "age".getBytes(), Bytes.toBytes("2017-04-20"))
      p.add("z2".getBytes(), "job".getBytes(), Bytes.toBytes(""+i))
      p.add("z2".getBytes(), "salary".getBytes(), Bytes.toBytes(""+i))
      myTable.put(p)
    }
    myTable.flushCommits()

但是现在我陷入了困境,如何将数据帧的每个记录插入到我的hbase表中。

But now I am stuck, how to insert each record of my dataframe into my hbase table.

感谢您的时间和精力

推荐答案

下面是使用spark hbase的完整示例来自Hortonworks的连接器可在 Maven 中获得。

Below is a full example using the spark hbase connector from Hortonworks available in Maven.

此示例显示


  • 如何检查HBase表是否存在

  • 如果不存在则创建HBase表

  • 将DataFrame插入HBas表

import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder, ConnectionFactory, TableDescriptorBuilder}
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog

object Main extends App {

  case class Employee(key: String, fName: String, lName: String, mName: String,
                      addressLine: String, city: String, state: String, zipCode: String)

  // as pre-requisites the table 'employee' with column families 'person' and 'address' should exist
  val tableNameString = "default:employee"
  val colFamilyPString = "person"
  val colFamilyAString = "address"
  val tableName = TableName.valueOf(tableNameString)
  val colFamilyP = colFamilyPString.getBytes
  val colFamilyA = colFamilyAString.getBytes

  val hBaseConf = HBaseConfiguration.create()
  val connection = ConnectionFactory.createConnection(hBaseConf);
  val admin = connection.getAdmin();

  println("Check if table 'employee' exists:")
  val tableExistsCheck: Boolean = admin.tableExists(tableName)
  println(s"Table " + tableName.toString + " exists? " + tableExistsCheck)

  if(tableExistsCheck == false) {
    println("Create Table employee with column families 'person' and 'address'")
    val colFamilyBuild1 = ColumnFamilyDescriptorBuilder.newBuilder(colFamilyP).build()
    val colFamilyBuild2 = ColumnFamilyDescriptorBuilder.newBuilder(colFamilyA).build()
    val tableDescriptorBuild = TableDescriptorBuilder.newBuilder(tableName)
      .setColumnFamily(colFamilyBuild1)
      .setColumnFamily(colFamilyBuild2)
      .build()
    admin.createTable(tableDescriptorBuild)
  }

  // define schema for the dataframe that should be loaded into HBase
  def catalog =
    s"""{
       |"table":{"namespace":"default","name":"employee"},
       |"rowkey":"key",
       |"columns":{
       |"key":{"cf":"rowkey","col":"key","type":"string"},
       |"fName":{"cf":"person","col":"firstName","type":"string"},
       |"lName":{"cf":"person","col":"lastName","type":"string"},
       |"mName":{"cf":"person","col":"middleName","type":"string"},
       |"addressLine":{"cf":"address","col":"addressLine","type":"string"},
       |"city":{"cf":"address","col":"city","type":"string"},
       |"state":{"cf":"address","col":"state","type":"string"},
       |"zipCode":{"cf":"address","col":"zipCode","type":"string"}
       |}
       |}""".stripMargin

  // define some test data
  val data = Seq(
    Employee("1","Horst","Hans","A","12main","NYC","NY","123"),
    Employee("2","Joe","Bill","B","1337ave","LA","CA","456"),
    Employee("3","Mohammed","Mohammed","C","1Apple","SanFran","CA","678")
  )

  // create SparkSession
  val spark: SparkSession = SparkSession.builder()
    .master("local[*]")
    .appName("HBaseConnector")
    .getOrCreate()

  // serialize data
  import spark.implicits._
  val df = spark.sparkContext.parallelize(data).toDF

  // write dataframe into HBase
  df.write.options(
    Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "3")) // create 3 regions
    .format("org.apache.spark.sql.execution.datasources.hbase")
    .save()

}

当我拥有相关的site-xmls( core-site.xml, ; hbase-site.xml, hdfs-site.xml)在我的资源中可用。

This worked for me while I had the relevant site-xmls ("core-site.xml", "hbase-site.xml", "hdfs-site.xml") available in my resources.

这篇关于将Spark数据框插入HBase的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆