如何在不使用 HDP 3.1 中的仓库连接器的情况下从 Spark 编写表以配置单元 [英] How to write a table to hive from spark without using the warehouse connector in HDP 3.1

查看:24
本文介绍了如何在不使用 HDP 3.1 中的仓库连接器的情况下从 Spark 编写表以配置单元的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当尝试在 HDP 3.1 上使用 spark 2.3 将没有仓库连接器的 Hive 表直接写入配置单元模式时,使用:

when trying to use spark 2.3 on HDP 3.1 to write to a Hive table without the warehouse connector directly into hives schema using:

spark-shell --driver-memory 16g --master local[3] --conf spark.hadoop.metastore.catalog.default=hive
val df = Seq(1,2,3,4).toDF
spark.sql("create database foo")
df.write.saveAsTable("foo.my_table_01")

失败:

Table foo.my_table_01 failed strict managed table checks due to the following reason: Table is marked as a managed table but is not transactional

但是:

val df = Seq(1,2,3,4).toDF.withColumn("part", col("value"))
df.write.partitionBy("part").option("compression", "zlib").mode(SaveMode.Overwrite).format("orc").saveAsTable("foo.my_table_02")

带有 spark.sql("select * from foo.my_table_02").show 的 Spark 工作正常.现在去蜂巢/直线:

Spark with spark.sql("select * from foo.my_table_02").show works just fine. Now going to Hive / beeline:

0: jdbc:hive2://hostname:2181/> select * from my_table_02;
Error: java.io.IOException: java.lang.IllegalArgumentException: bucketId out of range: -1 (state=,code=0)

A

 describe extended my_table_02;

返回

 +-----------------------------+----------------------------------------------------+----------+
|          col_name           |                     data_type                      | comment  |
+-----------------------------+----------------------------------------------------+----------+
| value                       | int                                                |          |
| part                        | int                                                |          |
|                             | NULL                                               | NULL     |
| # Partition Information     | NULL                                               | NULL     |
| # col_name                  | data_type                                          | comment  |
| part                        | int                                                |          |
|                             | NULL                                               | NULL     |
| Detailed Table Information  | Table(tableName:my_table_02, dbName:foo, owner:hive/bd-sandbox.t-mobile.at@SANDBOX.MAGENTA.COM, createTime:1571201905, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:value, type:int, comment:null), FieldSchema(name:part, type:int, comment:null)], location:hdfs://bd-sandbox.t-mobile.at:8020/warehouse/tablespace/external/hive/foo.db/my_table_02, inputFormat:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.ql.io.orc.OrcSerde, parameters:{path=hdfs://bd-sandbox.t-mobile.at:8020/warehouse/tablespace/external/hive/foo.db/my_table_02, compression=zlib, serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[FieldSchema(name:part, type:int, comment:null)], parameters:{numRows=0, rawDataSize=0, spark.sql.sources.schema.partCol.0=part, transient_lastDdlTime=1571201906, bucketing_version=2, spark.sql.create.version=2.3.2.3.1.0.0-78, totalSize=740, spark.sql.sources.schema.numPartCols=1, spark.sql.sources.schema.part.0={\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}, numFiles=4, numPartitions=4, spark.sql.partitionProvider=catalog, spark.sql.sources.schema.numParts=1, spark.sql.sources.provider=orc, transactional=true}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, rewriteEnabled:false, catName:hive, ownerType:USER, writeId:-1) |

如何在不使用仓库连接器的情况下使用 spark 写入 hive ,但仍写入相同的 Metastore,稍后可以由 hive 读取?据我所知,外部表应该是可能的(你不是托管的,不是 ACID 不是事务性的),但我不确定如何告诉 saveAsTable 如何处理这些.

How can I use spark to write to hive without using the warehouse connector but still writing to the same metastore which can later on be read by hive? To my best knowledge external tables should be possible (thy are not managed, not ACID not transactional), but I am not sure how to tell the saveAsTable how to handle these.

相关问题:

  • https://community.cloudera.com/t5/Support-Questions/In-hdp-3-0-can-t-create-hive-table-in-spark-failed/td-p/202647
  • 通过 Spark 加载的表无法在 Hive 中访问
    • 设置答案中建议的属性并不能解决我的问题

    可能是一种解决方法,例如 https://github.com/qubole/spark-acid 喜欢 https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.4/integrating-hive/content/hive_hivewarehouseconnector_for_handling_apache_spark_data.html 但我不喜欢在我没有看到任何大规模性能测试的情况下使用更多胶带的想法刚刚.此外,这意味着更改所有现有的 Spark 作业.

    Might be a workaround like the https://github.com/qubole/spark-acid like https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.4/integrating-hive/content/hive_hivewarehouseconnector_for_handling_apache_spark_data.html but I do not like the idea of using more duct tape where I have not seen any large scale performance tests just yet. Also, this means changing all existing spark jobs.

    事实上 无法将表保存到 hive Metastore,HDP 3.0 报告了大型数据帧和仓库连接器的问题.

    In fact Cant save table to hive metastore, HDP 3.0 reports issues with large data frames and the warehouse connector.

    我刚刚发现 https://community.cloudera.com/t5/Support-Questions/Spark-hive-warehouse-connector-not-loading-data-when-using/td-p/243613

    还有:

    execute() 与 executeQuery()

    execute() vs executeQuery()

    ExecuteQuery() 将始终使用 Hiveserver2-interactive/LLAP 作为它使用快速箭头协议.当 jdbc URL 指向非 LLAP Hiveserver2 将产生错误.

    ExecuteQuery() will always use the Hiveserver2-interactive/LLAP as it uses the fast ARROW protocol. Using it when the jdbc URL point to the non-LLAP Hiveserver2 will yield an error.

    Execute() 使用 JDBC 并且对 LLAP 没有这种依赖性,但是有内置限制,最多只能返回 1.000 条记录.但对于大多数查询 (INSERT INTO ... SELECT, count, sum, average) 不是一个问题.

    Execute() uses JDBC and does not have this dependency on LLAP, but has a built-in restriction to only return 1.000 records max. But for most queries (INSERT INTO ... SELECT, count, sum, average) that is not a problem.

    但这不会破坏 hive 和 spark 之间的任何高性能互操作性吗?尤其是在没有足够的 LLAP 节点可用于大规模 ETL 的情况下.

    But doesn't this kill any high-performance interoperability between hive and spark? Especially if there are not enough LLAP nodes available for large scale ETL.

    事实上,这是真的.可以在 https://github.com/hortonworks-spark/spark-llap/blob/26d164e62b45cfa1420d5d43cdef13d1d29bb877/src/main/java/com/hortonworks/spark/sql/hiveHWConf.java#L39,虽然我不确定增加这个值对性能的影响

    In fact, this is true. This setting can be configured at https://github.com/hortonworks-spark/spark-llap/blob/26d164e62b45cfa1420d5d43cdef13d1d29bb877/src/main/java/com/hortonworks/spark/sql/hive/llap/HWConf.java#L39, though I am not sure of the performance impact of increasing this value

    推荐答案

    "如何在不使用仓库连接器的情况下使用 spark 写入 hive,但仍然写入相同的 Metastore,以便稍后由 hive 读取?"

    "How can I use spark to write to hive without using the warehouse connector but still writing to the same metastore which can later on be read by hive?"

    我们正在处理相同的设置(HDP 3.1 和 Spark 2.3).使用下面的代码,我们收到了与bucketId 超出范围:-1"相同的错误消息.解决方案是在尝试查询表之前在 Hive shell 中运行 set hive.fetch.task.conversion=none;.

    We are working on the same setting (HDP 3.1 with Spark 2.3). Using below code we were getting the same error messages as you got "bucketId out of range: -1". The solution was to run set hive.fetch.task.conversion=none; in Hive shell before trying to query the table.

    在没有 HWC 的情况下将数据写入 Hive 的代码:

    The code to write data into Hive without the HWC:

      val warehouseLocation = new File("spark-warehouse").getAbsolutePath
    
      case class Record(key: Int, value: String)
    
      val spark = SparkSession.builder()
        .master("yarn")
        .appName("SparkHiveExample")
        .config("spark.sql.warehouse.dir", warehouseLocation)
        .enableHiveSupport()
        .getOrCreate()
    
      spark.sql("USE databaseName")
      val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
      recordsDF.write.mode(SaveMode.Overwrite).format("orc").saveAsTable("sparkhive_records")
    

    [来自 https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html 的示例]

    [Example from https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html]

    这篇关于如何在不使用 HDP 3.1 中的仓库连接器的情况下从 Spark 编写表以配置单元的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆