从配置单元表读取并使用 spark sql 写回它 [英] Read from a hive table and write back to it using spark sql

查看:45
本文介绍了从配置单元表读取并使用 spark sql 写回它的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Spark SQL 读取 Hive 表并将其分配给 scala val

I am reading a Hive table using Spark SQL and assigning it to a scala val

val x = sqlContext.sql("select * from some_table")

然后我对数据帧 x 进行一些处理,最后得到一个数据帧 y ,它具有与表 some_table 完全相同的架构.

Then I am doing some processing with the dataframe x and finally coming up with a dataframe y , which has the exact schema as the table some_table.

最后我试图插入覆盖 y 数据帧到同一个 hive 表 some_table

Finally I am trying to insert overwrite the y dataframe to the same hive table some_table

y.write.mode(SaveMode.Overwrite).saveAsTable().insertInto("some_table")

然后我收到错误

org.apache.spark.sql.AnalysisException:无法将覆盖插入正在读取的表中

org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from

我尝试创建一个插入 sql 语句并使用 sqlContext.sql() 触发它,但它也给了我同样的错误.

I tried creating an insert sql statement and firing it using sqlContext.sql() but it too gave me the same error.

有什么办法可以绕过这个错误?我需要将记录插入回同一个表.

Is there any way I can bypass this error? I need to insert the records back to the same table.

您好,我按照建议尝试了,但仍然出现相同的错误.

Hi I tried doing as suggested , but still getting the same error .

val x = sqlContext.sql("select * from incremental.test2")
val y = x.limit(5)
y.registerTempTable("temp_table")
val dy = sqlContext.table("temp_table")
dy.write.mode("overwrite").insertInto("incremental.test2")

scala> dy.write.mode("overwrite").insertInto("incremental.test2")
             org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from.;

推荐答案

其实你也可以使用检查点来实现这一点.由于它破坏了数据沿袭,Spark 无法检测到您正在同一个表中读取和覆盖:

Actually you can also use checkpointing to achieve this. Since it breaks data lineage, Spark is not able to detect that you are reading and overwriting in the same table:

 sqlContext.sparkContext.setCheckpointDir(checkpointDir)
 val ds = sqlContext.sql("select * from some_table").checkpoint()
 ds.write.mode("overwrite").saveAsTable("some_table")

这篇关于从配置单元表读取并使用 spark sql 写回它的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆