如何使用 Python 连接 HBase 和 Spark? [英] How to connect HBase and Spark using Python?

查看:31
本文介绍了如何使用 Python 连接 HBase 和 Spark?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个令人尴尬的并行任务,我使用 Spark 来分配计算.这些计算是在 Python 中进行的,我使用 PySpark 来读取和预处理数据.我的任务的输入数据存储在 HBase 中.不幸的是,我还没有找到一种令人满意的(即易于使用和可扩展的)方式来使用 Python 从/向 Spark 读取/写入 HBase 数据.

I have an embarrassingly parallel task for which I use Spark to distribute the computations. These computations are in Python, and I use PySpark to read and preprocess the data. The input data to my task is stored in HBase. Unfortunately, I've yet to find a satisfactory (i.e., easy to use and scalable) way to read/write HBase data from/to Spark using Python.

我之前探索过的内容:

  • 使用 happybase 从我的 Python 进程中进行连接.这个包允许使用 HBase 的 Thrift API 从 Python 连接到 HBase.这样,我基本上跳过 Spark 进行数据读取/写入,并且错过了潜在的 HBase-Spark 优化.读取速度似乎相当快,但写入速度很慢.这是目前我最好的解决方案.

  • Connecting from within my Python processes using happybase. This package allows connecting to HBase from Python by using HBase's Thrift API. This way, I basically skip Spark for data reading/writing and am missing out on potential HBase-Spark optimizations. Read speeds seem reasonably fast, but write speeds are slow. This is currently my best solution.

使用 SparkContext 的 newAPIHadoopRDDsaveAsNewAPIHadoopDataset 使用 HBase 的 MapReduce 接口.这方面的示例曾经包含在 Spark 代码库中(见这里).但是,这些现在被认为是过时的,有利于 HBase 的 Spark 绑定(参见此处).我还发现这种方法很慢而且很麻烦(对于读、写来说效果很好),例如因为必须以各种方式解析和转换从 newAPIHadoopRDD 返回的字符串才能最终得到我想要的 Python 对象.它还一次只支持一列.

Using SparkContext's newAPIHadoopRDD and saveAsNewAPIHadoopDataset that make use of HBase's MapReduce interface. Examples for this were once included in the Spark code base (see here). However, these are now considered outdated in favor of HBase's Spark bindings (see here). I've also found this method to be slow and cumbersome (for reading, writing worked well), for example as the strings returned from newAPIHadoopRDD had to be parsed and transformed in various ways to end up with the Python objects I wanted. It also only supported a single column at a time.

我知道的替代方案:

  • 我目前正在使用 Cloudera 的 CDH 和 5.7.0 版提供 hbase-spark (CDH 发行说明详细的博客文章).该模块(以前称为 SparkOnHBase)将正式成为 HBase 2.0 的一部分.不幸的是,这个绝妙的解决方案似乎只适用于 Scala/Java.

  • I'm currently using Cloudera's CDH and version 5.7.0 offers hbase-spark (CDH release notes, and a detailed blog post). This module (formerly known as SparkOnHBase) will officially be a part of HBase 2.0. Unfortunately, this wonderful solution seems to work only with Scala/Java.

华为的Spark-SQL-on-HBase/Astro(我看不出两者有什么区别……).它看起来不像我希望我的解决方案那样健壮和得到很好的支持.

Huawei's Spark-SQL-on-HBase / Astro (I don't see a difference between the two...). It does not look as robust and well-supported as I'd like my solution to be.

推荐答案

我找到了 此评论hbase-spark 的制造商之一,这似乎建议有一种方法可以使用 PySpark 使用 Spark SQL 查询 HBase.

I found this comment by one of the makers of hbase-spark, which seems to suggest there is a way to use PySpark to query HBase using Spark SQL.

事实上,此处描述的模式可以应用于使用 Spark SQL 查询 HBase使用 PySpark,如下例所示:

And indeed, the pattern described here can be applied to query HBase with Spark SQL using PySpark, as the following example shows:

from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext()
sqlc = SQLContext(sc)

data_source_format = 'org.apache.hadoop.hbase.spark'

df = sc.parallelize([('a', '1.0'), ('b', '2.0')]).toDF(schema=['col0', 'col1'])

# ''.join(string.split()) in order to write a multi-line JSON string here.
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"testtable"},
    "rowkey":"key",
    "columns":{
        "col0":{"cf":"rowkey", "col":"key", "type":"string"},
        "col1":{"cf":"cf", "col":"col1", "type":"string"}
    }
}""".split())


# Writing
df.write\
.options(catalog=catalog)\  # alternatively: .option('catalog', catalog)
.format(data_source_format)\
.save()

# Reading
df = sqlc.read\
.options(catalog=catalog)\
.format(data_source_format)\
.load()

我为此尝试了 hbase-spark-1.2.0-cdh5.7.0.jar(由 Cloudera 分发),但遇到了麻烦(org.apache.hadoop.hbase.spark.DefaultSource 不允许在写入时创建表为 selectjava.util.NoSuchElementException: None.get 读取时).事实证明,当前版本的 CDH 不包括允许 Spark SQL-HBase 集成的 hbase-spark 更改.

I've tried hbase-spark-1.2.0-cdh5.7.0.jar (as distributed by Cloudera) for this, but ran into trouble (org.apache.hadoop.hbase.spark.DefaultSource does not allow create table as select when writing, java.util.NoSuchElementException: None.get when reading). As it turns out, the present version of CDH does not include the changes to hbase-spark that allow Spark SQL-HBase integration.

做什么对我有用的是 shc Spark 包,找到 这里.我必须对上述脚本进行的唯一更改是更改:

What does work for me is the shc Spark package, found here. The only change I had to make to the above script is to change:

data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'

以下是我如何按照 shc README 中的示例在我的 CDH 集群上提交上述脚本:

Here's how I submit the above script on my CDH cluster, following the example from the shc README:

spark-submit --packages com.hortonworks:shc:1.0.0-1.6-s_2.10 --repositories http://repo.hortonworks.com/content/groups/public/ --files /opt/cloudera/parcels/CDH/lib/hbase/conf/hbase-site.xml example.py

shc 的大部分工作似乎已经合并到 HBase 的 hbase-spark 模块中,以在 2.0 版中发布.这样,可以使用上述模式对 HBase 进行 Spark SQL 查询(请参阅:https://hbase.apache.org/book.html#_sparksql_dataframes 了解详情).我上面的例子展示了 PySpark 用户的样子.

Most of the work on shc seems to already be merged into the hbase-spark module of HBase, for release in version 2.0. With that, Spark SQL querying of HBase is possible using the above-mentioned pattern (see: https://hbase.apache.org/book.html#_sparksql_dataframes for details). My example above shows what it looks like for PySpark users.

最后,一个警告:我上面的示例数据只有字符串.shc 不支持 Python 数据转换,因此我遇到了整数和浮点数未显示在 HBase 中或具有奇怪值的问题.

Finally, a caveat: my example data above has only strings. Python data conversion is not supported by shc, so I had problems with integers and floats not showing up in HBase or with weird values.

这篇关于如何使用 Python 连接 HBase 和 Spark?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆