外部覆盖后,Spark和Hive表架构不同步 [英] Spark and Hive table schema out of sync after external overwrite

查看:160
本文介绍了外部覆盖后,Spark和Hive表架构不同步的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在使用Spark 2.1.0和Hive 2.1.1的Mapr集群上的Spark和Hive之间无法配置Hive表的模式时遇到了问题.

I'm am having issues with the schema for Hive tables being out of sync between Spark and Hive on a Mapr cluster with Spark 2.1.0 and Hive 2.1.1.

我需要尝试专门针对托管表解决此问题,但是该问题可以在非托管/外部表中重现.

I need to try to resolve this problem specifically for managed tables, but the issue can be reproduced with unmanaged/external tables.

  1. 使用saveAsTable将数据框保存到给定的表中.
  2. 使用mode("overwrite").parquet("path/to/table")覆盖先前保存的表的数据.我实际上是通过Spark和Hive外部的过程修改数据的,但这会重现相同的问题.
  3. 使用spark.catalog.refreshTable(...)刷新元数据
  4. 使用spark.table(...).show()查询表.原始数据框和被覆盖的数据框之间相同的任何列都将正确显示新数据,但是仅在新表中的任何列都不会显示.
  1. Use saveAsTable to save a dataframe to a given table.
  2. Use mode("overwrite").parquet("path/to/table") to overwrite the data for the previously saved table. I am actually modifying the data through a process external to Spark and Hive, but this reproduces the same issue.
  3. Use spark.catalog.refreshTable(...) to refresh metadata
  4. Query the table with spark.table(...).show(). Any columns that were the same between the original dataframe and the overwriting one will show the new data correctly, but any columns that were only in the new table will not be displayed.

示例

db_name = "test_39d3ec9"
table_name = "overwrite_existing"
table_location = "<spark.sql.warehouse.dir>/{}.db/{}".format(db_name, table_name)

qualified_table = "{}.{}".format(db_name, table_name)
spark.sql("CREATE DATABASE IF NOT EXISTS {}".format(db_name))

另存为托管表

existing_df = spark.createDataFrame([(1, 2)])
existing_df.write.mode("overwrite").saveAsTable(table_name)

请注意,使用以下内容另存为非托管表将产生相同的问题:

Note that saving as an unmanaged table with the following will produce the same issue:

existing_df.write.mode("overwrite") \
    .option("path", table_location) \
    .saveAsTable(qualified_table)

查看表的内容

spark.table(table_name).show()
+---+---+
| _1| _2|
+---+---+
|  1|  2|
+---+---+

直接覆盖实木复合地板文件

Overwrite the parquet files directly

new_df = spark.createDataFrame([(3, 4, 5, 6)], ["_4", "_3", "_2", "_1"])
new_df.write.mode("overwrite").parquet(table_location)

用实木复合地板阅读器查看内容,内容正确显示

View the contents with the parquet reader, the contents show correctly

spark.read.parquet(table_location).show()
+---+---+---+---+
| _4| _3| _2| _1|
+---+---+---+---+
|  3|  4|  5|  6|
+---+---+---+---+

刷新表的spark元数据,然后再次作为表读取.对于相同的列,将更新数据,但不会显示其他列.

Refresh spark's metadata for the table and read in again as a table. The data will be updated for the columns that were the same, but the additional columns do not display.

spark.catalog.refreshTable(qualified_table)
spark.table(qualified_table).show()
+---+---+
| _1| _2|
+---+---+
|  6|  5|
+---+---+

在蜂巢外壳中使用以下命令调用spark.catalog.refreshTable之前,我还尝试过更新蜂巢中的架构:

I have also tried updating the schema in hive before calling spark.catalog.refreshTable with the below command in the hive shell:

ALTER TABLE test_39d3ec9.overwrite_existing REPLACE COLUMNS (`_1` bigint, `_2` bigint, `_3` bigint, `_4` bigint);

运行ALTER命令后,我然后运行describe并且它在蜂巢中正确显示

After running the ALTER command I then run describe and it shows correctly in hive

DESCRIBE test_39d3ec9.overwrite_existing
OK
_1                      bigint
_2                      bigint
_3                      bigint
_4                      bigint

在运行alter命令之前,它仅按预期显示原始列

Before running the alter command it only shows the original columns as expected

DESCRIBE test_39d3ec9.overwrite_existing
OK
_1                      bigint
_2                      bigint

然后我运行了spark.catalog.refreshTable,但是它并没有影响spark的数据视图.

I then ran spark.catalog.refreshTable but it didn't effect spark's view of the data.

从火花方面来说,我使用PySpark进行了大部分测试,但同时也在spark-shell(scala)和sparksql shell中进行了测试.在spark壳中时,我也尝试使用HiveContext,但没有用.

From the spark side, I did most of my testing with PySpark, but also tested in a spark-shell (scala) and a sparksql shell. While in the spark shell I also tried using a HiveContext but didn't work.

import org.apache.spark.sql.hive.HiveContext
import spark.sqlContext.implicits._
val hiveObj = new HiveContext(sc)
hiveObj.refreshTable("test_39d3ec9.overwrite_existing")

在配置单元外壳中执行ALTER命令之后,我在Hue中验证了该模式也在那里进行了更改.

After performing the ALTER command in the hive shell, I verified in Hue that the schema also changed there.

我也尝试使用spark.sql("ALTER ...")运行ALTER命令,但是我们所使用的Spark版本(2.1.0)不允许使用它,并且基于此问题,它似乎要等到Spark 2.2.0才可用. : https://issues.apache.org/jira/browse/SPARK-19261

I also tried running the ALTER command with spark.sql("ALTER ...") but the version of Spark we are on (2.1.0) does not allow it, and looks like it won't be available until Spark 2.2.0 based on this issue: https://issues.apache.org/jira/browse/SPARK-19261

我还再次阅读了spark文档,特别是以下部分:

I have also read through the spark docs again, specifically this section: https://spark.apache.org/docs/2.1.0/sql-programming-guide.html#hive-metastore-parquet-table-conversion

根据这些文档,spark.catalog.refreshTable应该可以工作. spark.sql.hive.convertMetastoreParquet的配置通常为false,但是我将其切换为true进行测试,但似乎没有任何作用.

Based on those docs, spark.catalog.refreshTable should work. The configuration for spark.sql.hive.convertMetastoreParquet is typically false, but I switched it to true for testing and it didn't seem to effect anything.

任何帮助将不胜感激,谢谢!

Any help would be appreciated, thank you!

推荐答案

在CDH 5.11.x软件包中使用spark 2.2.0时,我遇到了类似的问题.

I faced a similar issue while using spark 2.2.0 in CDH 5.11.x package.

spark.write.mode("overwrite").saveAsTable()之后,当我发出spark.read.table().show时,将不会显示任何数据.

After spark.write.mode("overwrite").saveAsTable() when I issue spark.read.table().show no data will be displayed.

经检查,我发现这是CDH spark 2.2.0版本的已知问题.解决方法是在执行saveAsTable命令后运行以下命令.

On checking I found it was a known issue with CDH spark 2.2.0 version. Workaround for that was to run the below command after the saveAsTable command was executed.

spark.sql("ALTER TABLE qualified_table set SERDEPROPERTIES ('path'='hdfs://{hdfs_host_name}/{table_path}')")

spark.catalog.refreshTable("qualified_table")

例如:如果您的桌子在LOCATION 就像 hdfs://hdfsHA/user/warehouse/example.db/qualified_table
然后分配'path'='hdfs://hdfsHA/user/warehouse/example.db/qualified_table'

eg: If your table LOCATION is like hdfs://hdfsHA/user/warehouse/example.db/qualified_table
then assign 'path'='hdfs://hdfsHA/user/warehouse/example.db/qualified_table'

这对我有用.试试看.我想现在您的问题已经解决了.如果没有,您可以尝试这种方法.

This worked for me. Give it a try. I assume by now your issue would have been resolved. If not you can try this method.

解决方法来源: https://www.cloudera.com/documentation/spark2/2-2-x/topics/spark2_known_issues.html

这篇关于外部覆盖后,Spark和Hive表架构不同步的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆