如何在pyspark.sql中选择创建表 [英] How to create a table as select in pyspark.sql

查看:219
本文介绍了如何在pyspark.sql中选择创建表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否可以使用select语句在spark上创建表?

Is it possible to create a table on spark using a select statement?

我执行以下操作

import findspark
findspark.init()
import pyspark
from pyspark.sql import SQLContext

sc = pyspark.SparkContext()
sqlCtx = SQLContext(sc)

spark_df = sqlCtx.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("./data/documents_topics.csv")
spark_df.registerTempTable("my_table")

sqlCtx.sql("CREATE TABLE my_table_2 AS SELECT * from my_table")

但是我得到了错误

/用户/用户/anaconda/bin/python /Users/user/workspace/Outbrain-Click-Prediction/test.py使用Spark的 默认的log4j配置文件:org/apache/spark/log4j-defaults.properties 将默认日志级别设置为"WARN".调整日志记录级别使用 sc.setLogLevel(newLevel). 17/01/21 17:19:43 WARN NativeCodeLoader: 无法为您的平台加载本地Hadoop库...正在使用 内置的Java类(如果适用)Traceback(最近的调用 最后):文件 "/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/utils.py", 第63行,在装饰中 返回f(* a,** kw)文件"/Users/user/spark-2.0.2-bin-hadoop2.7/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py",第319行,位于get_return_value py4j.protocol.Py4JJavaError:错误 调用o19.sql时发生. : org.apache.spark.sql.AnalysisException:不可解析的运算符 'CreateHiveTableAsSelectLogicalPlan CatalogTable(表:my_table_2 创建:2017年1月21日星期六17:19:53 EST上次访问:12月31日星期三 EST 1969 18:59:59 EST类型:MANAGED存储(InputFormat: org.apache.hadoop.mapred.TextInputFormat,OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)),false ;; 'CreateHiveTableAsSelectLogicalPlan CatalogTable(表:my_table_2 创建:2017年1月21日星期六17:19:53 EST上次访问:12月31日星期三 EST 1969 18:59:59 EST类型:MANAGED存储(InputFormat: org.apache.hadoop.mapred.TextInputFormat,OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)),false: +-专案[document_id#0,topic_id#1,confidence_level#2]:+-SubqueryAlias my_table:+- 关系[document_id#0,topic_id#1,confidence_level#2] csv

/Users/user/anaconda/bin/python /Users/user/workspace/Outbrain-Click-Prediction/test.py Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). 17/01/21 17:19:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Traceback (most recent call last): File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o19.sql. : org.apache.spark.sql.AnalysisException: unresolved operator 'CreateHiveTableAsSelectLogicalPlan CatalogTable( Table: my_table_2 Created: Sat Jan 21 17:19:53 EST 2017 Last Access: Wed Dec 31 18:59:59 EST 1969 Type: MANAGED Storage(InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false;; 'CreateHiveTableAsSelectLogicalPlan CatalogTable( Table: my_table_2 Created: Sat Jan 21 17:19:53 EST 2017 Last Access: Wed Dec 31 18:59:59 EST 1969 Type: MANAGED Storage(InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false : +- Project [document_id#0, topic_id#1, confidence_level#2] : +- SubqueryAlias my_table : +- Relation[document_id#0,topic_id#1,confidence_level#2] csv

在 org.apache.spark.sql.catalyst.analysis.CheckAnalysis $ class.failAnalysis(CheckAnalysis.scala:40) 在 org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58) 在 org.apache.spark.sql.catalyst.analysis.CheckAnalysis $$ anonfun $ checkAnalysis $ 1.apply(CheckAnalysis.scala:374) 在 org.apache.spark.sql.catalyst.analysis.CheckAnalysis $$ anonfun $ checkAnalysis $ 1.apply(CheckAnalysis.scala:67) 在 org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) 在 org.apache.spark.sql.catalyst.analysis.CheckAnalysis $ class.checkAnalysis(CheckAnalysis.scala:67) 在 org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58) 在 org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49) 在org.apache.spark.sql.Dataset $ .ofRows(Dataset.scala:64)在 org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)在 sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)位于 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java:498)在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)在 py4j.Gateway.invoke(Gateway.java:280)在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在py4j.commands.CallCommand.execute(CallCommand.java:79)处 py4j.GatewayConnection.run(GatewayConnection.java:214)位于 java.lang.Thread.run(Thread.java:745)

at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:374) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)

在处理上述异常期间,发生了另一个异常:

During handling of the above exception, another exception occurred:

回溯(最近通话最近):文件 "/用户/用户/用户/工作区/Outbrain-Click-Prediction/test.py",第16行,在 sqlCtx.sql(创建表my_table_2 AS SELECT * from my_table")文件 "/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/context.py", 第360行,在sql中 返回self.sparkSession.sql(sqlQuery)文件"/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/session.py", 543行,在sql中 返回DataFrame(self._jsparkSession.sql(sqlQuery),self._wrapped)文件 "/Users/user/spark-2.0.2-bin-hadoop2.7/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", 第1133行,在通话文件中 "/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/utils.py", 69行,在装饰中 引发AnalysisException(s.split(':',1)[1],stackTrace)pyspark.sql.utils.AnalysisException:未解决的运算符 'CreateHiveTableAsSelectLogicalPlan CatalogTable(\ n \ tTable: my_table_2 \ n \ t创建时间:2017年1月21日星期六17:19:53 EST \ n \ t最后访问时间: 星期三,美国东部标准时间1969年12月31日18:59:59 \ n \ t类型:MANAGED \ n \ tStorage(InputFormat: org.apache.hadoop.mapred.TextInputFormat,OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false ;; \ n'CreateHiveTableAsSelectLogicalPlan CatalogTable(\ n \ tTable: my_table_2 \ n \ t创建时间:2017年1月21日星期六17:19:53 EST \ n \ t最后访问时间: 星期三,美国东部标准时间1969年12月31日18:59:59 \ n \ t类型:MANAGED \ n \ tStorage(InputFormat: org.apache.hadoop.mapred.TextInputFormat,OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)),否\ n: +-项目[document_id#0,topic_id#1,confidence_level#2] \ n:+-SubqueryAlias my_table \ n:+- 关系[document_id#0,topic_id#1,confidence_level#2] csv \ n"

Traceback (most recent call last): File "/Users/user/workspace/Outbrain-Click-Prediction/test.py", line 16, in sqlCtx.sql("CREATE TABLE my_table_2 AS SELECT * from my_table") File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/context.py", line 360, in sql return self.sparkSession.sql(sqlQuery) File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/session.py", line 543, in sql return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in call File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: "unresolved operator 'CreateHiveTableAsSelectLogicalPlan CatalogTable(\n\tTable: my_table_2\n\tCreated: Sat Jan 21 17:19:53 EST 2017\n\tLast Access: Wed Dec 31 18:59:59 EST 1969\n\tType: MANAGED\n\tStorage(InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false;;\n'CreateHiveTableAsSelectLogicalPlan CatalogTable(\n\tTable: my_table_2\n\tCreated: Sat Jan 21 17:19:53 EST 2017\n\tLast Access: Wed Dec 31 18:59:59 EST 1969\n\tType: MANAGED\n\tStorage(InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false\n: +- Project [document_id#0, topic_id#1, confidence_level#2]\n: +- SubqueryAlias my_table\n: +- Relation[document_id#0,topic_id#1,confidence_level#2] csv\n"

推荐答案

我已通过使用HiveContext而不是SQLContext纠正了此问题,如下所示:

I've corrected this issue by using HiveContext instead of SQLContext as below:

import findspark
findspark.init()
import pyspark
from pyspark.sql import HiveContext

sqlCtx= HiveContext(sc)

spark_df = sqlCtx.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("./data/documents_topics.csv")
spark_df.registerTempTable("my_table")

sqlCtx.sql("CREATE TABLE my_table_2 AS SELECT * from my_table")

这篇关于如何在pyspark.sql中选择创建表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆