尝试覆盖 Hive 分区时写入 __HIVE_DEFAULT_PARTITION__ 的损坏行 [英] Corrupt rows written to __HIVE_DEFAULT_PARTITION__ when attempting to overwrite Hive partition

查看:39
本文介绍了尝试覆盖 Hive 分区时写入 __HIVE_DEFAULT_PARTITION__ 的损坏行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在尝试使用 Spark 2.3 覆盖 Hive 表中的分区时,我看到了一些非常奇怪的行为

I am seeing some very odd behaviour when attempting to overwrite a partition in a Hive table using Spark 2.3

首先,我在构建 SparkSession 时设置以下设置:

Firstly I am setting the following setting when building my SparkSession:

.config("spark.sql.sources.partitionOverwriteMode", "dynamic")

然后我将一些数据复制到新表中并按 date_id 列进行分区.

I am then copying some data into new table and partitioning by the date_id column.

ds
  .write
  .format("parquet")
  .option("compression", "snappy")
  .option("auto.purge", "true")
  .mode(saveMode)
  .partitionBy("date_id")
  .saveAsTable("tbl_copy")

我可以在 HDFS 中看到相关的 date_id 目录已经创建.

I can see in HDFS that the relevant date_id directories have been created.

然后我创建一个包含我希望覆盖的分区的数据的数据集,其中包含单个 date_id 的数据并插入到 Hive 中,如下所示:

I then create a DataSet containing data for the partition I wish to overwrite which contains data for a single date_id and insert into Hive as follows:

  ds
    .write
    .mode(SaveMode.Overwrite)
    .insertInto("tbl_copy")

作为完整性检查,我将相同的数据集写入新表.

As a sanity check I write the same Dataset to a new table.

      ds
        .write
        .format("parquet")
        .option("compression", "snappy")
        .option("auto.purge", "true")
        .mode(SaveMode.Overwrite)
        .saveAsTable("tmp_tbl")

tmp_tbl 中的数据完全符合预期.

The data in tmp_tbl is exactly as expected.

但是,当我查看 tbl_copy 时,我看到一个新的 HDFS 目录 `date_id=HIVE_DEFAULT_PARTITION

However when I look at tbl_copy I see a new HDFS directory `date_id=HIVE_DEFAULT_PARTITION

查询tbl_cpy

SELECT * from tbl_copy WHERE date_id IS NULL

我看到应该插入到分区 date_id=20180523 中的行,但是 date_id 列为空,并且不相关的 row_changed 列填充了值 20180523.

I see the rows that should have been inserted into partition date_id=20180523 however the date_id column is null and an unrelated row_changed column has been populated with value 20180523.

似乎对 Hive 的插入以某种方式导致我的数据被破坏.将相同的数据集写入新表不会导致任何问题.

It appears the insert into Hive is somehow causing my data to get mangled. Writing the same Dataset into a new table causes no issues.

有人能解释一下吗?

推荐答案

所以看起来分区列必须是数据集中的最后一个.

So it appears that partition columns must be the last ones in the Dataset.

我已经通过将以下方法拉到 Dataset[T] 上解决了这个问题.

I have solved the problem by pimping the following method onto Dataset[T].

def partitionsTail(partitionColumns: Seq[String]) = {
  val columns = dataset.schema.collect{ case s if !partitionColumns.contains(s.name) => s.name} ++ partitionColumns

  dataset.select(columns.head, columns.tail: _*).as[T]
} 

这篇关于尝试覆盖 Hive 分区时写入 __HIVE_DEFAULT_PARTITION__ 的损坏行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆