使用Pyspark与Hbase进行交互的最佳方法是什么 [英] What is the best possible way of interacting with Hbase using Pyspark

查看:110
本文介绍了使用Pyspark与Hbase进行交互的最佳方法是什么的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用pyspark [spark2.3.1]和Hbase1.2.1,我想知道使用pyspark访问Hbase的最佳方法是什么吗?

I am using pyspark [spark2.3.1] and Hbase1.2.1, I am wondering what could be the best possible way of accessing Hbase using pyspark?

我进行了一些初始搜索,发现几乎没有可用的选项,例如使用shc-core:1.1.1-2.1-s_2.11.jar,但这可以实现,但是无论我在哪里寻找示例,在大多数地方,代码都是用Scala编写的,或者示例也是基于Scala的.我尝试在pyspark中实现基本代码:

I did some initial level of search and found that there are few options available like using shc-core:1.1.1-2.1-s_2.11.jar this could be achieved, but whereever I try to look for some example, at most of the places code is written in Scala or examples are also scala based. I tried implementing basic code in pyspark:

from pyspark import SparkContext
from pyspark.sql import SQLContext

def main():
    sc = SparkContext()
    sqlc = SQLContext(sc)
    data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'
    catalog = ''.join("""{
        "table":{"namespace":"default", "name":"firsttable"},
        "rowkey":"key",
        "columns":{
            "firstcol":{"cf":"rowkey", "col":"key", "type":"string"},
            "secondcol":{"cf":"d", "col":"colname", "type":"string"}
        }
    }""".split())
    df = sqlc.read.options(catalog=catalog).format(data_source_format).load()
    df.select("secondcol").show()

# entry point for PySpark application
if __name__ == '__main__':
    main()

并使用以下命令运行它:

and running it using:

spark-submit  --master yarn-client --files /opt/hbase-1.1.2/conf/hbase-site.xml --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11  --jars /home/ubuntu/hbase-spark-2.0.0-alpha4.jar HbaseMain2.py

返回空白输出:

+---------+
|secondcol|
+---------+
+---------+

我不确定我做错了什么?还不确定这样做的最佳方法是什么?

I am not sure what am I doing wrong? Also not sure what would be the best approach of doing this??

任何参考文献将不胜感激.

Any references would be appreciated.

致谢

推荐答案

最后,使用 SHC ,我可以使用pyspark代码使用Spark-2.3.1连接到HBase-1.2.1.以下是我的工作:

Finally, Using SHC, I am able to connect to HBase-1.2.1 with Spark-2.3.1 using pyspark code. Following is my work:

  • 我所有的hadoop [namenode,datanode,nodemanager,resourcemanager]& hbase [Hmaster,HRegionServer,HQuorumPeer]守护进程已启动并在我的EC2实例上运行.

  • All my hadoop [namenode, datanode, nodemanager, resourcemanager] & hbase [Hmaster, HRegionServer, HQuorumPeer] deamons were up and running on my EC2 instance.

我将emp.csv文件放置在hdfs位置/test/emp.csv中,其中包含数据:

I placed emp.csv file at hdfs location /test/emp.csv, with data:

key,empId,empName,empWeight
1,"E007","Bhupesh",115.10
2,"E008","Chauhan",110.23
3,"E009","Prithvi",90.0
4,"E0010","Raj",80.0
5,"E0011","Chauhan",100.0

  • 我使用以下代码行创建了 readwriteHBase.py 文件[用于从HDFS读取emp.csv文件,然后首先在HBase中创建tblEmployee,将数据推入tblEmployee,然后再次读取来自同一表的一些数据并将其显示在控制台上]:

    • I created readwriteHBase.py file with following line of code [for reading emp.csv file from HDFS, then creating tblEmployee first in HBase, pushing the data into tblEmployee then once again reading some data from the same table and displaying it on console]:

      from pyspark.sql import SparkSession
      
      def main():
          spark = SparkSession.builder.master("yarn-client").appName("HelloSpark").getOrCreate()
      
          dataSourceFormat = "org.apache.spark.sql.execution.datasources.hbase"
          writeCatalog = ''.join("""{
                      "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
                      "rowkey":"key",
                      "columns":{
                        "key":{"cf":"rowkey", "col":"key", "type":"int"},
                        "empId":{"cf":"personal","col":"empId","type":"string"},
                        "empName":{"cf":"personal", "col":"empName", "type":"string"},
                        "empWeight":{"cf":"personal", "col":"empWeight", "type":"double"}
                      }
                    }""".split())
      
          writeDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/test/emp.csv")
          print("csv file read", writeDF.show())
          writeDF.write.options(catalog=writeCatalog, newtable=5).format(dataSourceFormat).save()
          print("csv file written to HBase")
      
          readCatalog = ''.join("""{
                      "table":{"namespace":"default", "name":"tblEmployee"},
                      "rowkey":"key",
                      "columns":{
                        "key":{"cf":"rowkey", "col":"key", "type":"int"},
                        "empId":{"cf":"personal","col":"empId","type":"string"},
                        "empName":{"cf":"personal", "col":"empName", "type":"string"}
                      }
                    }""".split())
      
          print("going to read data from Hbase table")
          readDF = spark.read.options(catalog=readCatalog).format(dataSourceFormat).load()
          print("data read from HBase table")
          readDF.select("empId", "empName").show()
          readDF.show()
      
      # entry point for PySpark application
      if __name__ == '__main__':
          main()
      

    • 使用以下命令在VM控制台上运行此脚本:

    • Ran this script on VM console using command:

      spark-submit --master yarn-client --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://nexus-private.hortonworks.com/nexus/content/repositories/IN-QA/ readwriteHBase.py
      

    • 中间结果:读取CSV文件后:

    • Intermediate Result: After reading CSV file:

      +---+-----+-------+---------+
      |key|empId|empName|empWeight|
      +---+-----+-------+---------+
      |  1| E007|Bhupesh|    115.1|
      |  2| E008|Chauhan|   110.23|
      |  3| E009|Prithvi|     90.0|
      |  4|E0010|    Raj|     80.0|
      |  5|E0011|Chauhan|    100.0|
      +---+-----+-------+---------+
      

    • 最终输出:从HBase表中读取数据后:

    • Final Output : after reading data from HBase table:

      +-----+-------+
      |empId|empName|
      +-----+-------+
      | E007|Bhupesh|
      | E008|Chauhan|
      | E009|Prithvi|
      |E0010|    Raj|
      |E0011|Chauhan|
      +-----+-------+
      

    • 注意:在创建Hbase表并将数据插入到HBase表中时,它期望NumberOfRegions应该大于3,因此我在向HBase添加数据时添加了options(catalog=writeCatalog, newtable=5)

      Note: While creating Hbase table and inserting data into HBase table it expects NumberOfRegions should be greater than 3, hence I have added options(catalog=writeCatalog, newtable=5) while adding data to HBase

      这篇关于使用Pyspark与Hbase进行交互的最佳方法是什么的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆