如何仅加载最后一个分区的数据 [英] How to load only the data of the last partition

查看:112
本文介绍了如何仅加载最后一个分区的数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我用这种方式对一些数据进行了分区:

I have some data partitioned this way:

/data/year=2016/month=9/version=0 /data/year=2016/month=10/version=0 /data/year=2016/month=10/version=1 /data/year=2016/month=10/version=2 /data/year=2016/month=10/version=3 /data/year=2016/month=11/version=0 /data/year=2016/month=11/version=1

/data/year=2016/month=9/version=0 /data/year=2016/month=10/version=0 /data/year=2016/month=10/version=1 /data/year=2016/month=10/version=2 /data/year=2016/month=10/version=3 /data/year=2016/month=11/version=0 /data/year=2016/month=11/version=1

使用此数据时,我只想加载每个月的最新版本.

When using this data, I'd like to load the last version only of each month.

执行此操作的一种简单方法是执行load("/data/year=2016/month=11/version=3")而不是执行load("/data").
该解决方案的缺点是丢失了诸如yearmonth之类的分区信息,这意味着不再可能基于年份或月份来应用操作.

A simple way to do this is to do load("/data/year=2016/month=11/version=3") instead of doing load("/data").
The drawback of this solution is the loss of partitioning information such as year and month, which means it would not be possible to apply operations based on the year or the month anymore.

是否可以要求Spark仅每月加载一次最新版本?您将如何处理?

Is it possible to ask Spark to load the last version only of each month? How would you go about this?

推荐答案

只是先前答案的补充

我在蜂巢中有一个下面的ORC格式表,该表按年,月和日划分.日期列.

I have a below ORC format table in hive which is partitioned on year,month & date column.

hive (default)> show partitions test_dev_db.partition_date_table;
OK
year=2019/month=08/day=07
year=2019/month=08/day=08
year=2019/month=08/day=09

如果设置以下属性,则可以在spark sql中读取最新的分区数据,如下所示:

If I set below properties, I can read the latest partition data in spark sql as shown below:

spark.sql("SET spark.sql.orc.enabled=true");
spark.sql("SET spark.sql.hive.convertMetastoreOrc=true")
spark.sql("SET spark.sql.orc.filterPushdown=true")

spark.sql("""select * from test_dev_db.partition_date_table where year ='2019' and month='08' and day='07' """).explain(True)

我们可以在计划中看到PartitionCount:1,很明显,它已经过滤了最新的分区.

we can see PartitionCount: 1 in plan and it's obvious that it has filtered the latest partition.

== Physical Plan ==
*(1) FileScan orc test_dev_db.partition_date_table[emp_id#212,emp_name#213,emp_salary#214,emp_date#215,year#216,month#217,day#218] Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://xxx.host.com:8020/user/xxxx/dev/hadoop/database/test_dev..., **PartitionCount: 1**, PartitionFilters: [isnotnull(year#216), isnotnull(month#217), isnotnull(day#218), (year#216 = 2019), (month#217 = 0..., PushedFilters: [], ReadSchema: struct<emp_id:int,emp_name:string,emp_salary:int,emp_date:date>

如果我使用以下查询,则相同的方法将不起作用: 即使我们使用spark.read.format("orc").load(hdfs absolute path of table)创建数据框并创建一个临时视图并在其上运行spark sql.除非我们在此之上使用特定的过滤条件,否则它将仍然扫描该表的所有可用分区.

whereas same will not work if I use below query: even if we create dataframe using spark.read.format("orc").load(hdfs absolute path of table) and create a temporary view and run spark sql on that. It will still scan all the partitions available for that table until and unless we use specific filter condition on top of that.

spark.sql("""select * from test_dev_db.partition_date_table where year ='2019' and month='08' and day in (select max(day) from test_dev_db.partition_date_table)""").explain(True)

它仍然扫描了所有三个分区,这里是PartitionCount:3

It still has scanned all the three partitions, here PartitionCount: 3

== Physical Plan ==
*(2) BroadcastHashJoin [day#282], [max(day)#291], LeftSemi, BuildRight
:- *(2) FileScan orc test_dev_db.partition_date_table[emp_id#276,emp_name#277,emp_salary#278,emp_date#279,year#280,month#281,day#282] Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://xxx.host.com:8020/user/xxx/dev/hadoop/database/test_dev..., PartitionCount: 3, PartitionFilters: [isnotnull(year#280), isnotnull(month#281), (year#280 = 2019), (month#281 = 08)], PushedFilters: [], ReadSchema: struct<emp_id:int,emp_name:string,emp_salary:int,emp_date:date>

要使用spark sql根据最大分区过滤出数据,我们可以使用以下方法.我们可以使用以下技术对分区进行修剪,以限制Spark在查询Hive ORC表数据时读取的文件和分区的数量.

To filter out the data based on the max partition using spark sql, we can use the below approach. we can use below technique for partition pruning to limits the number of files and partitions that Spark reads when querying the Hive ORC table data.

rdd=spark.sql("""show partitions test_dev_db.partition_date_table""").rdd.flatMap(lambda x:x)
newrdd=rdd.map(lambda x : x.replace("/","")).map(lambda x : x.replace("year=","")).map(lambda x : x.replace("month=","-")).map(lambda x : x.replace("day=","-")).map(lambda x : x.split('-'))
max_year=newrdd.map(lambda x : (x[0])).max() 
max_month=newrdd.map(lambda x : x[1]).max()
max_day=newrdd.map(lambda x : x[2]).max()

使用这些最大值准备查询以过滤Hive分区表.

prepare your query to filter Hive partition table using these max values.

query = "select * from test_dev_db.partition_date_table where year ='{0}' and month='{1}' and day ='{2}'".format(max_year,max_month,max_day)

>>> spark.sql(query).show();
+------+--------+----------+----------+----+-----+---+
|emp_id|emp_name|emp_salary|  emp_date|year|month|day|
+------+--------+----------+----------+----+-----+---+
|     3|  Govind|    810000|2019-08-09|2019|   08| 09|
|     4|  Vikash|      5500|2019-08-09|2019|   08| 09|
+------+--------+----------+----------+----+-----+---+

spark.sql(query).explain(True)

如果您看到此查询的计划,则可以看到它仅扫描了给定Hive表的一个分区. 这里PartitionCount是1

If you see the plan of this query, you can see that it has scanned only one partition of given Hive table. here PartitionCount is 1

== Optimized Logical Plan ==
Filter (((((isnotnull(day#397) && isnotnull(month#396)) && isnotnull(year#395)) && (year#395 = 2019)) && (month#396 = 08)) && (day#397 = 09))
+- Relation[emp_id#391,emp_name#392,emp_salary#393,emp_date#394,year#395,month#396,day#397] orc

== Physical Plan ==
*(1) FileScan orc test_dev_db.partition_date_table[emp_id#391,emp_name#392,emp_salary#393,emp_date#394,year#395,month#396,day#397] Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://xxx.host.com:8020/user/xxx/dev/hadoop/database/test_dev..., PartitionCount: 1, PartitionFilters: [isnotnull(day#397), isnotnull(month#396), isnotnull(year#395), (year#395 = 2019), (month#396 = 0..., PushedFilters: [], ReadSchema: struct<emp_id:int,emp_name:string,emp_salary:int,emp_date:date>

这篇关于如何仅加载最后一个分区的数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆