Hive 不读取 Spark 生成的分区镶木地板文件 [英] Hive doesn't read partitioned parquet files generated by Spark

查看:29
本文介绍了Hive 不读取 Spark 生成的分区镶木地板文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Hive 中读取 Spark 生成的分区镶木地板文件时遇到问题.我可以在 hive 中创建外部表,但是当我尝试选择几行时,hive 只返回一条没有行的OK"消息.

我能够在 Spark 中正确读取分区的镶木地板文件,因此我假设它们是正确生成的.当我在 hive 中创建一个外部表而不进行分区时,我也可以读取这些文件.

有人有什么建议吗?

我的环境是:

  • 集群 EMR 4.1.0
  • 蜂巢 1.0.0
  • 火花 1.5.0
  • 色调 3.7.1
  • Parquet 文件存储在 S3 存储桶中 (s3://staging-dev/test/ttfourfieldspart2/year=2013/month=11)

我的 Spark 配置文件有以下参数(/etc/spark/conf.dist/spark-defaults.conf):

spark.master 纱线spark.driver.extraClassPath/etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*spark.driver.extraLibraryPath/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/nativespark.executor.extraClassPath/etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*spark.executor.extraLibraryPath/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/nativespark.eventLog.enabled truespark.eventLog.dir hdfs:///var/log/spark/appsspark.history.fs.logDirectory hdfs:///var/log/spark/appsspark.yarn.historyServer.address ip-10-37-161-246.ec2.internal:18080spark.history.ui.port 18080spark.shuffle.service.enabled truespark.driver.extraJavaOptions -Dlog4j.configuration=file:///etc/spark/conf/log4j.properties -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=512M -XX:OnOutOfMemoryError='kill -9 %p'spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnkillOfMemoryError='%p'spark.executor.memory 4Gspark.driver.memory 4Gspark.dynamicAllocation.enabled 真spark.dynamicAllocation.maxExecutors 100spark.dynamicAllocation.minExecutors 1

Hive 配置文件有以下参数(/etc/hive/conf/hive-site.xml):

<预><代码><配置><!-- Hive 配置可以存储在这个文件中,也可以存储在 hadoop 配置文件中 --><!-- 由 Hadoop 设置变量隐含的.--><!-- 除了 Hadoop 设置变量之外 - 提供此文件是为了方便 Hive --><!-- 用户不必编辑 hadoop 配置文件(可以作为集中管理--><!-- 资源).--><!-- Hive 执行参数--><财产><name>hbase.zookeeper.quorum</name><value>ip-10-xx-xxx-xxx.ec2.internal</value><描述>http://wiki.apache.org/hadoop/Hive/HBaseIntegration</description></属性><财产><name>hive.execution.engine</name><value>mr</value></属性><财产><name>fs.defaultFS</name><value>hdfs://ip-10-xx-xxx-xxx.ec2.internal:8020</value></属性><财产><name>hive.metastore.uris</name><value>thrift://ip-10-xx-xxx-xxx.ec2.internal:9083</value><description>JDBC 元存储的 JDBC 连接字符串</description></属性><财产><name>javax.jdo.option.ConnectionURL</name><value>jdbc:mysql://ip-10-xx-xxx-xxx.ec2.internal:3306/hive?createDatabaseIfNotExist=true</value><description>用于 Metastore 数据库的用户名</description></属性><财产><name>javax.jdo.option.ConnectionDriverName</name><value>org.mariadb.jdbc.Driver</value><description>用于 Metastore 数据库的用户名</description></属性><财产><name>javax.jdo.option.ConnectionUserName</name><value>hive</value><description>用于 Metastore 数据库的用户名</description></属性><财产><name>javax.jdo.option.ConnectionPassword</name><value>1R72JFCDG5XaaDTB</value><description>用于 Metastore 数据库的密码</description></属性><财产><name>datanucleus.fixedDatastore</name><值>真</值></属性><财产><name>mapred.reduce.tasks</name><值>-1</值></属性><财产><name>mapred.max.split.size</name><value>256000000</value></属性><财产><name>hive.metastore.connect.retries</name><值>5</值></属性><财产><name>hive.optimize.sort.dynamic.partition</name><值>真</值></属性><property><name>hive.exec.dynamic.partition</name><value>true</value></property><property><name>hive.exec.dynamic.partition.mode</name><value>nonstrict</value></property><property><name>hive.exec.max.dynamic.partitions</name><value>10000</value></property><property><name>hive.exec.max.dynamic.partitions.pernode</name><value>500</value></property></配置>

我读取分区镶木地板文件的python代码:

from pyspark import *从 pyspark.sql 导入 *从 pyspark.sql.types 导入 *从 pyspark.sql.functions 导入 *df7 = sqlContext.read.parquet('s3://staging-dev/test/ttfourfieldspart2/')

Spark 打印的 Parquet 文件架构:

<预><代码>>>>df7.schemaStructType(List(StructField(transactionid,StringType,true),StructField(eventts,TimestampType,true),StructField(year,IntegerType,true),StructField(month,IntegerType,true)))>>>df7.printSchema()根|-- transactionid: string (nullable = true)|-- 事件:时间戳(可为空 = 真)|-- 年:整数(可为空 = 真)|-- 月:整数(可为空 = 真)>>>df7.show(10)+--------------------+--------------------+----+-------+|交易ID|事件|年|月|+--------------------+--------------------+----+-------+|f7018907-ed3d-49b...|2013-11-21 18:41:...|2013|11||f6d95a5f-d4ba-489...|2013-11-21 18:41:...|2013|11||02b2a715-6e15-4bb...|2013-11-21 18:41:...|2013|11||0e908c0f-7d63-48c...|2013-11-21 18:41:...|2013|11||f83e30f9-950a-4b9...|2013-11-21 18:41:...|2013|11||3425e4ea-b715-476...|2013-11-21 18:41:...|2013|11||a20a6aeb-da4f-4fd...|2013-11-21 18:41:...|2013|11||d2f57e6f-889b-49b...|2013-11-21 18:41:...|2013|11||46f2eda5-408e-44e...|2013-11-21 18:41:...|2013|11||36fb8b79-b2b5-493...|2013-11-21 18:41:...|2013|11|+--------------------+--------------------+----+-------+只显示前 10 行

Hive 中的创建表:

如果不存在则创建外部表 t3(交易ID字符串,事件时间戳)按(整数年,整数月)分区储存为镶木地板位置 's3://staging-dev/test/ttfourfieldspart2/';

当我尝试在 Hive 中选择一些行时,它不返回任何行:

hive>select * from t3 limit 10;行耗时:0.027 秒蜂巢>

解决方案

我终于找到了问题所在.当您在 Hive 中创建表时,其中分区数据已存在于 S3 或 HDFS 中,您需要运行命令以使用表的分区结构更新 Hive Metastore.看看这里:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-RecoverPartitions(MSCKREPAIRTABLE)

命令是:MSCK 维修表表名;在 Amazon EMR 中运行的 Hive 上,您可以使用:ALTER TABLE table_name RECOVER PARTITIONS;

I'm having a problem to read partitioned parquet files generated by Spark in Hive. I'm able to create the external table in hive but when I try to select a few lines, hive returns only an "OK" message with no rows.

I'm able to read the partitioned parquet files correctly in Spark, so I'm assuming that they were generated correctly. I'm also able to read these files when I create an external table in hive without partitioning.

Does anyone have a suggestion?

My Environment is:

  • Cluster EMR 4.1.0
  • Hive 1.0.0
  • Spark 1.5.0
  • Hue 3.7.1
  • Parquet files are stored in a S3 bucket (s3://staging-dev/test/ttfourfieldspart2/year=2013/month=11)

My Spark config file has the following parameters(/etc/spark/conf.dist/spark-defaults.conf):

spark.master yarn
spark.driver.extraClassPath /etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*
spark.driver.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
spark.executor.extraClassPath /etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*
spark.executor.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
spark.eventLog.enabled true
spark.eventLog.dir hdfs:///var/log/spark/apps
spark.history.fs.logDirectory hdfs:///var/log/spark/apps
spark.yarn.historyServer.address ip-10-37-161-246.ec2.internal:18080
spark.history.ui.port 18080
spark.shuffle.service.enabled true
spark.driver.extraJavaOptions    -Dlog4j.configuration=file:///etc/spark/conf/log4j.properties -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=512M -XX:OnOutOfMemoryError='kill -9 %p'
spark.executor.extraJavaOptions  -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'
spark.executor.memory 4G
spark.driver.memory 4G
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.maxExecutors 100
spark.dynamicAllocation.minExecutors 1

Hive config file has the following parameters(/etc/hive/conf/hive-site.xml):

<configuration>

<!-- Hive Configuration can either be stored in this file or in the hadoop configuration files  -->
<!-- that are implied by Hadoop setup variables.                                                -->
<!-- Aside from Hadoop setup variables - this file is provided as a convenience so that Hive    -->
<!-- users do not have to edit hadoop configuration files (that may be managed as a centralized -->
<!-- resource).                                                                                 -->

<!-- Hive Execution Parameters -->


<property>
  <name>hbase.zookeeper.quorum</name>
  <value>ip-10-xx-xxx-xxx.ec2.internal</value>
  <description>http://wiki.apache.org/hadoop/Hive/HBaseIntegration</description>
</property>

<property>
  <name>hive.execution.engine</name>
  <value>mr</value>
</property>

  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://ip-10-xx-xxx-xxx.ec2.internal:8020</value>
  </property>

<property>
  <name>hive.metastore.uris</name>
  <value>thrift://ip-10-xx-xxx-xxx.ec2.internal:9083</value>
  <description>JDBC connect string for a JDBC metastore</description>
</property>

<property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://ip-10-xx-xxx-xxx.ec2.internal:3306/hive?createDatabaseIfNotExist=true</value>
    <description>username to use against metastore database</description>
</property>

<property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>org.mariadb.jdbc.Driver</value>
    <description>username to use against metastore database</description>
</property>

<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>hive</value>
  <description>username to use against metastore database</description>
</property>

<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>1R72JFCDG5XaaDTB</value>
  <description>password to use against metastore database</description>
</property>

  <property>
    <name>datanucleus.fixedDatastore</name>
    <value>true</value>
  </property>

  <property>
    <name>mapred.reduce.tasks</name>
    <value>-1</value>
  </property>

  <property>
    <name>mapred.max.split.size</name>
    <value>256000000</value>
  </property>

  <property>
    <name>hive.metastore.connect.retries</name>
    <value>5</value>
  </property>

  <property>
    <name>hive.optimize.sort.dynamic.partition</name>
    <value>true</value>
  </property>

  <property><name>hive.exec.dynamic.partition</name><value>true</value></property>
  <property><name>hive.exec.dynamic.partition.mode</name><value>nonstrict</value></property>
  <property><name>hive.exec.max.dynamic.partitions</name><value>10000</value></property>
  <property><name>hive.exec.max.dynamic.partitions.pernode</name><value>500</value></property>

</configuration>

My python code that reads the partitioned parquet file:

from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

df7 = sqlContext.read.parquet('s3://staging-dev/test/ttfourfieldspart2/')

The parquet file schema printed by Spark:

>>> df7.schema
StructType(List(StructField(transactionid,StringType,true),StructField(eventts,TimestampType,true),StructField(year,IntegerType,true),StructField(month,IntegerType,true)))

>>> df7.printSchema()
root
 |-- transactionid: string (nullable = true)
 |-- eventts: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

>>> df7.show(10)
+--------------------+--------------------+----+-----+
|       transactionid|             eventts|year|month|
+--------------------+--------------------+----+-----+
|f7018907-ed3d-49b...|2013-11-21 18:41:...|2013|   11|
|f6d95a5f-d4ba-489...|2013-11-21 18:41:...|2013|   11|
|02b2a715-6e15-4bb...|2013-11-21 18:41:...|2013|   11|
|0e908c0f-7d63-48c...|2013-11-21 18:41:...|2013|   11|
|f83e30f9-950a-4b9...|2013-11-21 18:41:...|2013|   11|
|3425e4ea-b715-476...|2013-11-21 18:41:...|2013|   11|
|a20a6aeb-da4f-4fd...|2013-11-21 18:41:...|2013|   11|
|d2f57e6f-889b-49b...|2013-11-21 18:41:...|2013|   11|
|46f2eda5-408e-44e...|2013-11-21 18:41:...|2013|   11|
|36fb8b79-b2b5-493...|2013-11-21 18:41:...|2013|   11|
+--------------------+--------------------+----+-----+
only showing top 10 rows

The create table in Hive:

create external table if not exists t3(
  transactionid string,
  eventts timestamp)
partitioned by (year int, month int)
stored as parquet
location 's3://staging-dev/test/ttfourfieldspart2/';

When I try to select some rows in Hive, it doesn't return any rows:

hive> select * from t3 limit 10;
OK
Time taken: 0.027 seconds
hive> 

解决方案

I finally found the problem. When you create tables in Hive, where partitioned data already exists in S3 or HDFS, you need to run a command to update the Hive Metastore with the table's partition structure. Take a look here: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-RecoverPartitions(MSCKREPAIRTABLE)

The commands are:

MSCK REPAIR TABLE table_name;


And on Hive running in Amazon EMR you can use:

ALTER TABLE table_name RECOVER PARTITIONS;

这篇关于Hive 不读取 Spark 生成的分区镶木地板文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆