Hive不读取由Spark生成的分区地板文件 [英] Hive doesn't read partitioned parquet files generated by Spark

查看:165
本文介绍了Hive不读取由Spark生成的分区地板文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个问题需要阅读Spark在Hive中生成的分区地板文件。我可以在配置单元中创建外部表,但是当我尝试选择几行时,配置单元只返回没有行的OK消息。



能够在Spark中正确读取分区的parquet文件,所以我假设它们是正确生成的。
当我在没有分区的情况下在hive中创建一个外部表时,我也可以读取这些文件。



有人有建议么?



我的环境是:


  • 集群EMR 4.1.0 li>
  • Hive 1.0.0

  • Spark 1.5.0

  • Hue 3.7.1

  • Parquet文件存储在S3存储桶中(s3:// staging-dev / test / ttfourfieldspart2 / year = 2013 / month = 11)


我的Spark配置文件具有以下参数(/etc/spark/conf.dist/spark-defaults.conf):

  spark.master yarn 
spark.driver.extraClassPath / etc / hadoop / conf:/ etc / hive / conf:/ usr / lib / hadoop / *:/ usr / LIB / Hadoop的HDFS / *:/ usr / lib中/ Hadoop的MapReduce的/ *:/ usr / lib中/ Hadoop的纱线/ *:/ usr / lib中/ Hadoop的LZO / lib中/ *:在/ usr /共享/ AWS / emr / emrfs / conf:/ usr / share / aws / emr / emrfs / lib / *:/ usr / share / aws / emr / emrfs / auxlib / *
spark.driver.extraLibraryPath / usr / Hadoop的/ lib目录/本地:/ usr / lib中/ Hadoop的LZO / lib目录/ NATIV e
spark.executor.extraClassPath / etc / hadoop / conf:/ etc / hive / conf:/ usr / lib / hadoop / *:/ usr / lib / hadoop-hdfs / *:/ usr / lib / hadoop -mapreduce / *:/ usr / lib中/ Hadoop的纱线/ *:/ usr / lib中/ Hadoop的LZO / lib中/ *:在/ usr /共享/ AWS / EMR / emrfs / CONF:在/ usr /共享/ AWS / EMR / emrfs / lib / *:/ usr / share / aws / emr / emrfs / auxlib / *
spark.executor.extraLibraryPath / usr / lib / hadoop / lib / native:/ usr / lib / hadoop-lzo / lib / native
spark.eventLog.enabled true
spark.eventLog.dir hdfs:/// var / log / spark / apps
spark.history.fs.logDirectory hdfs:/// var / log / spark / apps
spark.yarn.historyServer.address ip-10-37-161-246.ec2.internal:18080
spark.history.ui.port 18080
spark .shuffle.service.enabled true
spark.driver.extraJavaOptions -Dlog4j.configuration = file:///etc/spark/conf/log4j.properties -XX:+ UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction = 70 -XX:MaxHeapFreeRatio = 70 -XX:+ CMSClassUnloadingEnabled -XX:MaxPermSize = 512M -XX:OnOutOfMemoryError ='kill -9%p'
spark.executor.extraJavaOptions -verbose:gc -XX :+ PrintGCDetails -XX:+ PrintGCDateStamps -XX:+ UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction = 70 -XX:MaxHeapFreeRatio = 70 -XX:+ CMSClassUnloadingEnabled -XX:OnOutOfMemoryError ='kill -9%p'
spark.executor。内存4G
spark.driver.memory 4G
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.maxExecutors 100
spark.dynamicAllocation.minExecutors 1

Hive配置文件具有以下参数(/etc/hive/conf/hive-site.xml):

 <配置> 

<! - Hive配置既可以存储在这个文件中,也可以存储在hadoop配置文件中 - >
<! - - 它们是Hadoop设置变量所隐含的。 - >
<! - 除Hadoop设置变量外 - 提供此文件作为一种便利,以便Hive - >
<! - 用户不必编辑hadoop配置文件(可以将其作为集中管理的>
<! - 资源进行管理)。 - >

<! - Hive执行参数 - >


<属性>
<名称> hbase.zookeeper.quorum< / name>
< value> ip-10-xx-xxx-xxx.ec2.internal< / value>
< description> http://wiki.apache.org/hadoop/Hive/HBaseIntegration< / description>
< / property>

<属性>
<名称> hive.execution.engine< /名称>
<值> mr< /值>
< / property>

<属性>
<名称> fs.defaultFS< / name>
<值> hdfs://ip-10-xx-xxx-xxx.ec2.internal:8020< /值>
< / property>

<属性>
< name> hive.metastore.uris< / name>
< value> thrift://ip-10-xx-xxx-xxx.ec2.internal:9083< /值>
< description> JDBC元数据的JDBC连接字符串< / description>
< / property>

<属性>
< name> javax.jdo.option.ConnectionURL< / name>
< value> jdbc:mysql://ip-10-xx-xxx-xxx.ec2.internal:3306 / hive?createDatabaseIfNotExist = true< / value>
< description>用于针对Metastore数据库的用户名< / description>
< / property>

<属性>
< name> javax.jdo.option.ConnectionDriverName< / name>
< value> org.mariadb.jdbc.Driver< / value>
< description>用于针对Metastore数据库的用户名< / description>
< / property>

<属性>
< name> javax.jdo.option.ConnectionUserName< / name>
< value>配置单元< /值>
< description>用于针对Metastore数据库的用户名< / description>
< / property>

<属性>
< name> javax.jdo.option.ConnectionPassword< / name>
<值> 1R72JFCDG5XaaDTB< /值>
< description>密码将用于Metastore数据库< / description>
< / property>

<属性>
< name> datanucleus.fixedDatastore< / name>
<值> true< /值>
< / property>

<属性>
<名称> mapred.reduce.tasks< / name>
<值> -1< /值>
< / property>

<属性>
<名称> mapred.max.split.size< / name>
<值> 256000000< /值>
< / property>

<属性>
< name> hive.metastore.connect.retries< / name>
<值> 5< /值>
< / property>

<属性>
< name> hive.optimize.sort.dynamic.partition< / name>
<值> true< /值>
< / property>

<属性><名称> hive.exec.dynamic.partition< /名称><值> true< /值>< / property>
< property>< name> hive.exec.dynamic.partition.mode< / name>< value> nonstrict< / value>< / property>
< property>< name> hive.exec.max.dynamic.partitions< / name><值> 10000< /值>< / property>
< property>< name> hive.exec.max.dynamic.partitions.pernode< / name><值> 500< /值>< / property>

< / configuration>

我的python代码读取分区的parquet文件:

  from pyspark import * 
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

df7 = sqlContext.read.parquet('s3:// staging-dev / test / ttfourfieldspart2 /')

p

$ b

由Spark打印的parquet文件架构

 >>> df7.schema 
StructType(StructField(transactionID,StringType,true),StructField(eventts,TimestampType,true),StructField(year,IntegerType,true),StructField(month,IntegerType,true)))

>>> df7.printSchema()
root
| - transactionid:string(nullable = true)
| --timets:timestamp(nullable = true)
| - year:integer (nullable = true)
| - month:integer(nullable = true)

>>> df7.show(10)
+ -------------------- + ------------------ - + ---- + ----- +
|的transactionId | eventts |一年|月份|
+ -------------------- + -------------------- + --- - + ----- +
| f7018907-ed3d-49b ... | 2013-11-21 18:41:... | 2013 | 11 |
| f6d95a5f-d4ba-489 ... | 2013-11-21 18:41:... | 2013 | 11 |
| 02b2a715-6e15-4bb ... | 2013-11-21 18:41:... | 2013 | 11 |
| 0e908c0f-7d63-48c ... | 2013-11-21 18:41:... | 2013 | 11 |
| f83e30f9-950a-4b9 ... | 2013-11-21 18:41:... | 2013 | 11 |
| 3425e4ea-b715-476 ... | 2013-11-21 18:41:... | 2013 | 11 |
| a20a6aeb-da4f-4fd ... | 2013-11-21 18:41:... | 2013 | 11 |
| d2f57e6f-889b-49b ... | 2013-11-21 18:41:... | 2013 | 11 |
| 46f2eda5-408e-44e ... | 2013-11-21 18:41:... | 2013 | 11 |
| 36fb8b79-b2b5-493 ... | 2013-11-21 18:41:... | 2013 | 11 |
+ -------------------- + -------------------- + --- - + ----- +
只显示前10行

在Hive中创建表:

 创建外部表(如果不存在)t3(
transactionid string,
eventts timestamp)
由(年份int,月份int)分区
作为parquet存储
location's3:// staging-dev / test / ttfourfieldspart2 /';

当我尝试在Hive中选择某些行时,它不会返回任何行:

  hive> select * from t3 limit 10; 
确定
所用时间:0.027秒
配置单元>


解决方案

我终于找到了问题所在。在Hive中创建表时,在S3或HDFS中已存在分区数据,您需要运行命令以使用表的分区结构更新Hive Metastore。看看这里:
https://cwiki.apache。 org / confluence / display / Hive / LanguageManual + DDL#LanguageManualDDL-RecoverPartitions(MSCKREPAIRTABLE)


 命令如下: 

MSCK REPAIR TABLE table_name;


在Amazon EMR中运行的Hive中,您可以使用:

ALTER TABLE table_name RECOVER PARTITIONS;


I'm having a problem to read partitioned parquet files generated by Spark in Hive. I'm able to create the external table in hive but when I try to select a few lines, hive returns only an "OK" message with no rows.

I'm able to read the partitioned parquet files correctly in Spark, so I'm assuming that they were generated correctly. I'm also able to read these files when I create an external table in hive without partitioning.

Does anyone have a suggestion?

My Environment is:

  • Cluster EMR 4.1.0
  • Hive 1.0.0
  • Spark 1.5.0
  • Hue 3.7.1
  • Parquet files are stored in a S3 bucket (s3://staging-dev/test/ttfourfieldspart2/year=2013/month=11)

My Spark config file has the following parameters(/etc/spark/conf.dist/spark-defaults.conf):

spark.master yarn
spark.driver.extraClassPath /etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*
spark.driver.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
spark.executor.extraClassPath /etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*
spark.executor.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
spark.eventLog.enabled true
spark.eventLog.dir hdfs:///var/log/spark/apps
spark.history.fs.logDirectory hdfs:///var/log/spark/apps
spark.yarn.historyServer.address ip-10-37-161-246.ec2.internal:18080
spark.history.ui.port 18080
spark.shuffle.service.enabled true
spark.driver.extraJavaOptions    -Dlog4j.configuration=file:///etc/spark/conf/log4j.properties -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=512M -XX:OnOutOfMemoryError='kill -9 %p'
spark.executor.extraJavaOptions  -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'
spark.executor.memory 4G
spark.driver.memory 4G
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.maxExecutors 100
spark.dynamicAllocation.minExecutors 1

Hive config file has the following parameters(/etc/hive/conf/hive-site.xml):

<configuration>

<!-- Hive Configuration can either be stored in this file or in the hadoop configuration files  -->
<!-- that are implied by Hadoop setup variables.                                                -->
<!-- Aside from Hadoop setup variables - this file is provided as a convenience so that Hive    -->
<!-- users do not have to edit hadoop configuration files (that may be managed as a centralized -->
<!-- resource).                                                                                 -->

<!-- Hive Execution Parameters -->


<property>
  <name>hbase.zookeeper.quorum</name>
  <value>ip-10-xx-xxx-xxx.ec2.internal</value>
  <description>http://wiki.apache.org/hadoop/Hive/HBaseIntegration</description>
</property>

<property>
  <name>hive.execution.engine</name>
  <value>mr</value>
</property>

  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://ip-10-xx-xxx-xxx.ec2.internal:8020</value>
  </property>

<property>
  <name>hive.metastore.uris</name>
  <value>thrift://ip-10-xx-xxx-xxx.ec2.internal:9083</value>
  <description>JDBC connect string for a JDBC metastore</description>
</property>

<property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://ip-10-xx-xxx-xxx.ec2.internal:3306/hive?createDatabaseIfNotExist=true</value>
    <description>username to use against metastore database</description>
</property>

<property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>org.mariadb.jdbc.Driver</value>
    <description>username to use against metastore database</description>
</property>

<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>hive</value>
  <description>username to use against metastore database</description>
</property>

<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>1R72JFCDG5XaaDTB</value>
  <description>password to use against metastore database</description>
</property>

  <property>
    <name>datanucleus.fixedDatastore</name>
    <value>true</value>
  </property>

  <property>
    <name>mapred.reduce.tasks</name>
    <value>-1</value>
  </property>

  <property>
    <name>mapred.max.split.size</name>
    <value>256000000</value>
  </property>

  <property>
    <name>hive.metastore.connect.retries</name>
    <value>5</value>
  </property>

  <property>
    <name>hive.optimize.sort.dynamic.partition</name>
    <value>true</value>
  </property>

  <property><name>hive.exec.dynamic.partition</name><value>true</value></property>
  <property><name>hive.exec.dynamic.partition.mode</name><value>nonstrict</value></property>
  <property><name>hive.exec.max.dynamic.partitions</name><value>10000</value></property>
  <property><name>hive.exec.max.dynamic.partitions.pernode</name><value>500</value></property>

</configuration>

My python code that reads the partitioned parquet file:

from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

df7 = sqlContext.read.parquet('s3://staging-dev/test/ttfourfieldspart2/')

The parquet file schema printed by Spark:

>>> df7.schema
StructType(List(StructField(transactionid,StringType,true),StructField(eventts,TimestampType,true),StructField(year,IntegerType,true),StructField(month,IntegerType,true)))

>>> df7.printSchema()
root
 |-- transactionid: string (nullable = true)
 |-- eventts: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

>>> df7.show(10)
+--------------------+--------------------+----+-----+
|       transactionid|             eventts|year|month|
+--------------------+--------------------+----+-----+
|f7018907-ed3d-49b...|2013-11-21 18:41:...|2013|   11|
|f6d95a5f-d4ba-489...|2013-11-21 18:41:...|2013|   11|
|02b2a715-6e15-4bb...|2013-11-21 18:41:...|2013|   11|
|0e908c0f-7d63-48c...|2013-11-21 18:41:...|2013|   11|
|f83e30f9-950a-4b9...|2013-11-21 18:41:...|2013|   11|
|3425e4ea-b715-476...|2013-11-21 18:41:...|2013|   11|
|a20a6aeb-da4f-4fd...|2013-11-21 18:41:...|2013|   11|
|d2f57e6f-889b-49b...|2013-11-21 18:41:...|2013|   11|
|46f2eda5-408e-44e...|2013-11-21 18:41:...|2013|   11|
|36fb8b79-b2b5-493...|2013-11-21 18:41:...|2013|   11|
+--------------------+--------------------+----+-----+
only showing top 10 rows

The create table in Hive:

create external table if not exists t3(
  transactionid string,
  eventts timestamp)
partitioned by (year int, month int)
stored as parquet
location 's3://staging-dev/test/ttfourfieldspart2/';

When I try to select some rows in Hive, it doesn't return any rows:

hive> select * from t3 limit 10;
OK
Time taken: 0.027 seconds
hive> 

解决方案

I finally found the problem. When you create tables in Hive, where partitioned data already exists in S3 or HDFS, you need to run a command to update the Hive Metastore with the table's partition structure. Take a look here: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-RecoverPartitions(MSCKREPAIRTABLE)

The commands are:

MSCK REPAIR TABLE table_name;


And on Hive running in Amazon EMR you can use:

ALTER TABLE table_name RECOVER PARTITIONS;

这篇关于Hive不读取由Spark生成的分区地板文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆