指定列的 Spark sql 问题 [英] Spark sql issue with columns specified
问题描述
我们正在尝试将一个 oracle 数据库复制到 hive 中.我们从 oracle 获取查询并在 hive 中运行它们.因此,我们以这种格式获取它们:
INSERT INTO schema.table(col1,col2) VALUES ('val','val');
虽然此查询直接在 Hive 中工作,但当我使用 spark.sql 时,出现以下错误:
org.apache.spark.sql.catalyst.parser.ParseException:不匹配的输入 'emp_id' 期望 {'(', 'SELECT', 'FROM', 'VALUES', 'TABLE', 'INSERT', 'MAP', 'REDUCE'}(line 1, pos 20)== SQL ==插入 ss.tab(emp_id,firstname,lastname) 值 ('1','demo','demo')--------------------^^^在 org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)在 org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)在 org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)在 org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:68)在 org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623)在 org.apache.spark.sql.SQLContext.sql(SQLContext.scala:691)在 com.datastream.SparkReplicator.insertIntoHive(SparkReplicator.java:20)在 com.datastream.App.main(App.java:67)在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在 java.lang.reflect.Method.invoke(Method.java:498)在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
出现此错误是因为 Spark SQL 不支持插入语句中的列列表.所以从插入语句中排除列列表.
下面是我的蜂巢表:
select * from UDB.emp_details_table;+---------+-----------+-----------+------------------+----+|emp_id |emp_name |emp_dept |emp_joining_date |+---------+-----------+-----------+------------------+----+|1 |AAA |人力资源 |2018-12-06 ||1 |BBB |人力资源 |2017-10-26 ||2 |XXX |管理员 |2018-10-22 ||2 |年年 |管理员 |2015-10-19 ||2 |ZZZ |资讯科技 |2018-05-14 ||3 |GGG |人力资源 |2018-06-30 |+---------+-----------+-----------+------------------+----+
这里我通过pyspark使用spark sql插入记录
df = spark.sql("""insert into UDB.emp_details_table values ('6','VVV','IT','2018-12-18')""");
您可以在下面看到给定的记录已插入到我现有的 hive 表中.
+---------+-----------+-----------+-------------------+--+|emp_id |emp_name |emp_dept |emp_joining_date |+---------+-----------+-----------+------------------+----+|1 |AAA |人力资源 |2018-12-06 ||1 |BBB |人力资源 |2017-10-26 ||2 |XXX |管理员 |2018-10-22 ||2 |年年 |管理员 |2015-10-19 ||2 |ZZZ |资讯科技 |2018-05-14 ||3 |GGG |人力资源 |2018-06-30 ||6 |VVV |资讯科技 |2018-12-18 |+---------+-----------+-----------+------------------+----+
将您的 spark sql 查询更改为:
spark.sql("""insert into ss.tab values ('1','demo','demo')""");
<块引用>
注意:我使用的是 spark 2.3,如果您需要使用 hive 上下文正在使用 spark 1.6 版本.
让我知道它是否有效.
we are trying to replicate an oracle db into hive. We get the queries from oracle and run them in hive. So, we get them in this format:
INSERT INTO schema.table(col1,col2) VALUES ('val','val');
While this query works in Hive directly, when I use spark.sql, I get the following error:
org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'emp_id' expecting {'(', 'SELECT', 'FROM', 'VALUES', 'TABLE', 'INSERT', 'MAP', 'REDUCE'}(line 1, pos 20)
== SQL ==
insert into ss.tab(emp_id,firstname,lastname) values ('1','demo','demo')
--------------------^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)
at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:68)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:691)
at com.datastream.SparkReplicator.insertIntoHive(SparkReplicator.java:20)
at com.datastream.App.main(App.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
This error is coming as Spark SQL does not support column lists in the insert statement. so exclude the column list from the insert statement.
below was my hive table:
select * from UDB.emp_details_table;
+---------+-----------+-----------+-------------------+--+
| emp_id | emp_name | emp_dept | emp_joining_date |
+---------+-----------+-----------+-------------------+--+
| 1 | AAA | HR | 2018-12-06 |
| 1 | BBB | HR | 2017-10-26 |
| 2 | XXX | ADMIN | 2018-10-22 |
| 2 | YYY | ADMIN | 2015-10-19 |
| 2 | ZZZ | IT | 2018-05-14 |
| 3 | GGG | HR | 2018-06-30 |
+---------+-----------+-----------+-------------------+--+
here I am inserting record using spark sql through pyspark
df = spark.sql("""insert into UDB.emp_details_table values ('6','VVV','IT','2018-12-18')""");
you could see below that given record has been inserted to my existing hive table.
+---------+-----------+-----------+-------------------+--+
| emp_id | emp_name | emp_dept | emp_joining_date |
+---------+-----------+-----------+-------------------+--+
| 1 | AAA | HR | 2018-12-06 |
| 1 | BBB | HR | 2017-10-26 |
| 2 | XXX | ADMIN | 2018-10-22 |
| 2 | YYY | ADMIN | 2015-10-19 |
| 2 | ZZZ | IT | 2018-05-14 |
| 3 | GGG | HR | 2018-06-30 |
| 6 | VVV | IT | 2018-12-18 |
+---------+-----------+-----------+-------------------+--+
change your spark sql query as :
spark.sql("""insert into ss.tab values ('1','demo','demo')""");
Note: I am using spark 2.3, you need to use hive context in case you are using spark 1.6 version.
Let me know if it works.
这篇关于指定列的 Spark sql 问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!