pyspark - 从 Hive 分区列逻辑获取最新分区 [英] pyspark - getting Latest partition from Hive partitioned column logic

查看:77
本文介绍了pyspark - 从 Hive 分区列逻辑获取最新分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 pySpark 的新手.我正在尝试使用 PySpark-dataframes 获取配置单元表的最新分区(日期分区),如下所示.但我确信使用数据帧函数(而不是通过编写 SQL)有更好的方法来做到这一点.您能否就更好的方法分享意见.

此解决方案正在扫描 Hive 表上的整个数据以获取它.

df_1 = sqlContext.table("dbname.tablename");df_1_dates = df_1.select('partitioned_date_column').distinct().orderBy(df_1['partitioned_date_column'].desc())lat_date_dict=df_1_dates.first().asDict()lat_dt=lat_date_dict['partitioned_date_column']

解决方案

我同意@philantrovert 在评论中提到的内容.您可以使用以下方法进行分区修剪来过滤以限制为您的配置单元表扫描的分区数.

<预><代码>>>>spark.sql("""显示分区 test_dev_db.newpartitiontable""").show();+--------------------+|分区|+--------------------+|tran_date=2009-01-01||tran_date=2009-02-01||tran_date=2009-03-01||tran_date=2009-04-01||tran_date=2009-05-01||tran_date=2009-06-01||tran_date=2009-07-01||tran_date=2009-08-01||tran_date=2009-09-01||tran_date=2009-10-01||tran_date=2009-11-01||tran_date=2009-12-01|+--------------------+>>>max_date=spark.sql("""显示分区 test_dev_db.newpartitiontable""").rdd.flatMap(lambda x:x).map(lambda x : x.replace("tran_date=","")).max()>>>打印最大日期2009-12-01>>>query = "select city,state,country from test_dev_db.newpartitiontable where tran_date ='{}'".format(max_date)>>>spark.sql(查询).show();+--------------------+----------------+--------------+|城市|状态|国家|+--------------------+----------------+--------------+|南安普顿|英国|英国||W 黎巴嫩 ...|NH|美国||科莫克斯|不列颠哥伦比亚省|加拿大||加斯佩里奇|卢森堡|卢森堡|+--------------------+----------------+--------------+>>>spark.sql(查询).解释(真)== 解析逻辑计划 =='项目['城市,'州,'国家]+- '过滤器 ('tran_date = 2009-12-01)+- 'UnresolvedRelation`test_dev_db`.`newpartitiontable`==分析逻辑计划==城市:字符串,州:字符串,国家:字符串项目 [城市#9,州#10,国家#11]+- 过滤器 (tran_date#12 = 2009-12-01)+- SubqueryAlias newpartitiontable+- 关系[city#9,state#10,country#11,tran_date#12] orc== 优化的逻辑规划 ==项目 [城市#9,州#10,国家#11]+- 过滤器 (isnotnull(tran_date#12) && (tran_date#12 = 2009-12-01))+- 关系[city#9,state#10,country#11,tran_date#12] orc== 物理计划 ==*(1) 项目 [city#9, state#10, country#11]+- *(1) FileScan orc test_dev_db.newpartitiontable[city#9,state#10,country#11,tran_date#12] 批处理:true,格式:ORC,位置:PrunedInMemoryFileIndex[hdfs://xxx.host.com:8020/user/xxx/dev/hadoop/database/test_dev..., PartitionCount: 1, PartitionFilters: [isnotnull(tran_date#12), (tran_date#12 = 2009-12-01)], PushedFilters: [], ReadSchema: struct

你可以在上面的计划中看到 PartitionCount: 1 它只扫描了 12 个可用分区中的一个分区.

I am new to pySpark. I am trying get the latest partition (date partition) of a hive table using PySpark-dataframes and done like below. But I am sure there is a better way to do it using dataframe functions (not by writing SQL). Could you please share inputs on better ways.

This solution is scanning through entire data on Hive table to get it.

df_1 = sqlContext.table("dbname.tablename");

df_1_dates = df_1.select('partitioned_date_column').distinct().orderBy(df_1['partitioned_date_column'].desc())

lat_date_dict=df_1_dates.first().asDict()

lat_dt=lat_date_dict['partitioned_date_column']

解决方案

I agree with @philantrovert what has mentioned in the comment. You can use below approach for partition pruning to filter to limit the number of partitions scanned for your hive table.

>>> spark.sql("""show partitions test_dev_db.newpartitiontable""").show();
+--------------------+
|           partition|
+--------------------+
|tran_date=2009-01-01|
|tran_date=2009-02-01|
|tran_date=2009-03-01|
|tran_date=2009-04-01|
|tran_date=2009-05-01|
|tran_date=2009-06-01|
|tran_date=2009-07-01|
|tran_date=2009-08-01|
|tran_date=2009-09-01|
|tran_date=2009-10-01|
|tran_date=2009-11-01|
|tran_date=2009-12-01|
+--------------------+

>>> max_date=spark.sql("""show partitions test_dev_db.newpartitiontable""").rdd.flatMap(lambda x:x).map(lambda x : x.replace("tran_date=","")).max()
>>> print max_date
2009-12-01
>>> query = "select city,state,country from test_dev_db.newpartitiontable where tran_date ='{}'".format(max_date)

>>> spark.sql(query).show();
+--------------------+----------------+--------------+
|                city|           state|       country|
+--------------------+----------------+--------------+
|         Southampton|         England|United Kingdom|
|W Lebanon        ...|              NH| United States|
|               Comox|British Columbia|        Canada|
|           Gasperich|      Luxembourg|    Luxembourg|
+--------------------+----------------+--------------+

>>> spark.sql(query).explain(True)
== Parsed Logical Plan ==
'Project ['city, 'state, 'country]
+- 'Filter ('tran_date = 2009-12-01)
   +- 'UnresolvedRelation `test_dev_db`.`newpartitiontable`

== Analyzed Logical Plan ==
city: string, state: string, country: string
Project [city#9, state#10, country#11]
+- Filter (tran_date#12 = 2009-12-01)
   +- SubqueryAlias newpartitiontable
      +- Relation[city#9,state#10,country#11,tran_date#12] orc

== Optimized Logical Plan ==
Project [city#9, state#10, country#11]
+- Filter (isnotnull(tran_date#12) && (tran_date#12 = 2009-12-01))
   +- Relation[city#9,state#10,country#11,tran_date#12] orc

== Physical Plan ==
*(1) Project [city#9, state#10, country#11]
+- *(1) FileScan orc test_dev_db.newpartitiontable[city#9,state#10,country#11,tran_date#12] Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://xxx.host.com:8020/user/xxx/dev/hadoop/database/test_dev..., PartitionCount: 1, PartitionFilters: [isnotnull(tran_date#12), (tran_date#12 = 2009-12-01)], PushedFilters: [], ReadSchema: struct<city:string,state:string,country:string>

you can see in above plan that PartitionCount: 1 it has scanned only one partition from 12 available partitions.

这篇关于pyspark - 从 Hive 分区列逻辑获取最新分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆