在pyspark中查询HIVE表 [英] Query HIVE table in pyspark

查看:71
本文介绍了在pyspark中查询HIVE表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 CDH5.5

我在 HIVE 默认数据库中创建了一个表,并且能够通过 HIVE 命令查询它.

输出

hive>默认情况下使用;行耗时:0.582 秒蜂巢>显示表格;行银行耗时:0.341 秒,获取:1 行蜂巢>从银行中选择计数(*);行542所用时间:64.961 秒,提取:1 行

但是,我无法从 pyspark 查询该表,因为它无法识别该表.

from pyspark.context import SparkContext从 pyspark.sql 导入 HiveContextsqlContext = HiveContext(sc)sqlContext.sql("使用默认")数据帧[结果:字符串]sqlContext.sql("显示表").show()+---------+-----------+|表名|isTemporary|+---------+-----------++---------+-----------+sqlContext.sql("FROM bank SELECT count(*)")16/03/16 20:12:13 INFO parse.ParseDriver:解析命令:FROM bank SELECT count(*)16 年 3 月 16 日 20:12:13 信息 parse.ParseDriver:解析完成回溯(最近一次调用最后一次):文件<stdin>",第 1 行,在 <module> 中文件/usr/lib/spark/python/pyspark/sql/context.py",第 552 行,在 sql 中返回数据帧(self._ssql_ctx.sql(sqlQuery),self)文件/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",第538行,在__call__文件/usr/lib/spark/python/pyspark/sql/utils.py",第 40 行,在 deco引发 AnalysisException(s.split(': ', 1)[1])**pyspark.sql.utils.AnalysisException:没有这样的表库;第 1 行位置 5**

新错误

<预><代码>>>>从 pyspark.sql 导入 HiveContext>>>hive_context = HiveContext(sc)>>>bank = hive_context.table("default.bank")16/03/22 18:33:30 信息 DataNucleus.Persistence:属性 datanucleus.cache.level2 未知 - 将被忽略16/03/22 18:33:30 信息 DataNucleus.Persistence:属性 hive.metastore.integral.jdo.pushdown 未知 - 将被忽略16/03/22 18:33:44 INFO DataNucleus.Datastore:类org.apache.hadoop.hive.metastore.model.MFieldSchema"被标记为仅嵌入",因此没有自己的数据存储表.16/03/22 18:33:44 INFO DataNucleus.Datastore:类org.apache.hadoop.hive.metastore.model.MOrder"被标记为仅嵌入",因此没有自己的数据存储表.16/03/22 18:33:48 INFO DataNucleus.Datastore:类org.apache.hadoop.hive.metastore.model.MFieldSchema"被标记为仅嵌入",因此没有自己的数据存储表.16/03/22 18:33:48 INFO DataNucleus.Datastore:类org.apache.hadoop.hive.metastore.model.MOrder"被标记为仅嵌入",因此没有自己的数据存储表.16/03/22 18:33:50 信息 DataNucleus.Datastore:类org.apache.hadoop.hive.metastore.model.MResourceUri"被标记为仅嵌入",因此没有自己的数据存储表.回溯(最近一次调用最后一次):文件<stdin>",第 1 行,在 <module> 中文件/usr/lib/spark/python/pyspark/sql/context.py",第 565 行,在表中返回数据帧(self._ssql_ctx.table(tableName),self)文件/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",第538行,在__call__文件/usr/lib/spark/python/pyspark/sql/utils.py",第 36 行,在 deco返回 f(*a, **kw)文件/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",第300行,在get_return_value中py4j.protocol.Py4JJavaError:调用 o22.table 时发生错误.: org.apache.spark.sql.catalyst.analysis.NoSuchTableException在 org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:123)在 org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:123)在 scala.Option.getOrElse(Option.scala:120)在 org.apache.spark.sql.hive.client.ClientInterface$class.getTable(ClientInterface.scala:123)在 org.apache.spark.sql.hive.client.ClientWrapper.getTable(ClientWrapper.scala:60)在 org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:406)在 org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:422)在 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:203)在 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:203)在 scala.Option.getOrElse(Option.scala:120)在 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:203)在 org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:422)在 org.apache.spark.sql.SQLContext.table(SQLContext.scala:739)在 org.apache.spark.sql.SQLContext.table(SQLContext.scala:735)在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在 java.lang.reflect.Method.invoke(Method.java:606)在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)在 py4j.Gateway.invoke(Gateway.java:259)在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)在 py4j.commands.CallCommand.execute(CallCommand.java:79)在 py4j.GatewayConnection.run(GatewayConnection.java:207)在 java.lang.Thread.run(Thread.java:745)

谢谢

解决方案

我们不能将 Hive 表名直接传递给 Hive 上下文 sql 方法,因为它不理解 Hive 表名.在 pyspark shell 中读取 Hive 表的一种方法是:

from pyspark.sql import HiveContexthive_context = HiveContext(sc)bank = hive_context.table("default.bank")银行.show()

在 hive 表上运行 SQL:首先,我们需要注册从读取 hive 表中获得的数据帧.然后我们就可以运行 SQL 查询了.

bank.registerTempTable("bank_temp")hive_context.sql("select * from bank_temp").show()

I am using CDH5.5

I have a table created in HIVE default database and able to query it from the HIVE command.

Output

hive> use default;

OK

Time taken: 0.582 seconds


hive> show tables;

OK

bank
Time taken: 0.341 seconds, Fetched: 1 row(s)

hive> select count(*) from bank;

OK

542

Time taken: 64.961 seconds, Fetched: 1 row(s)

However, I am unable to query the table from pyspark as it cannot recognize the table.

from pyspark.context import SparkContext

from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)


sqlContext.sql("use default")

DataFrame[result: string]

sqlContext.sql("show tables").show()

+---------+-----------+

|tableName|isTemporary|

+---------+-----------+

+---------+-----------+


sqlContext.sql("FROM bank SELECT count(*)")

16/03/16 20:12:13 INFO parse.ParseDriver: Parsing command: FROM bank SELECT count(*)
16/03/16 20:12:13 INFO parse.ParseDriver: Parse Completed
Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
    File "/usr/lib/spark/python/pyspark/sql/context.py", line 552, in sql
      return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
    File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",   line 538, in __call__
    File "/usr/lib/spark/python/pyspark/sql/utils.py", line 40, in deco
      raise AnalysisException(s.split(': ', 1)[1])
  **pyspark.sql.utils.AnalysisException: no such table bank; line 1 pos 5**

New Error

>>> from pyspark.sql import HiveContext
>>> hive_context = HiveContext(sc)
>>> bank = hive_context.table("default.bank")
16/03/22 18:33:30 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored
16/03/22 18:33:30 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
16/03/22 18:33:44 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
16/03/22 18:33:44 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
16/03/22 18:33:48 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
16/03/22 18:33:48 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
16/03/22 18:33:50 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/sql/context.py", line 565, in table
    return DataFrame(self._ssql_ctx.table(tableName), self)
  File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o22.table.
: org.apache.spark.sql.catalyst.analysis.NoSuchTableException
    at org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:123)
    at org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:123)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.sql.hive.client.ClientInterface$class.getTable(ClientInterface.scala:123)
    at org.apache.spark.sql.hive.client.ClientWrapper.getTable(ClientWrapper.scala:60)
    at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:406)
    at org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:422)
    at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:203)
    at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:203)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:203)
    at org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:422)
    at org.apache.spark.sql.SQLContext.table(SQLContext.scala:739)
    at org.apache.spark.sql.SQLContext.table(SQLContext.scala:735)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)

thanks

解决方案

We cannot pass the Hive table name directly to Hive context sql method since it doesn't understand the Hive table name. One way to read Hive table in pyspark shell is:

from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
bank = hive_context.table("default.bank")
bank.show()

To run the SQL on the hive table: First, we need to register the data frame we get from reading the hive table. Then we can run the SQL query.

bank.registerTempTable("bank_temp")
hive_context.sql("select * from bank_temp").show()

这篇关于在pyspark中查询HIVE表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆