Spark SubQuery扫描整个分区 [英] Spark SubQuery scan whole partition

查看:66
本文介绍了Spark SubQuery扫描整个分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个配置单元表,该配置表按日期"字段进行了分区 我想编写一个查询以从最新(最大)分区获取数据.

I have a hive table which is partitioned by 'date' field i want to write a query to get the data from latest(max) partition.

spark.sql("select field from table  where date_of = '2019-06-23'").explain(True)
vs 
spark.sql("select filed from table where date_of = (select max(date_of) from table)").explain(True)

下面是两个查询的物理计划

Below are the Physical plan of the two query

*(1) Project [qbo_company_id#120L]
        +- *(1) FileScan parquet 
    table[qbo_company_id#120L,date_of#157] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[s3location..., PartitionCount: 1, PartitionFilters: [isnotnull(date_of#157), (cast(date_of#157 as string) = 2019-06-23)], PushedFilters: [], ReadSchema: struct<qbo_company_id:bigint>

*(1) Project [qbo_company_id#1L]
+- *(1) Filter (date_of#38 = Subquery subquery0)
   :  +- Subquery subquery0
   :     +- *(2) HashAggregate(keys=[], functions=[max(date_of#76)], output=[max(date_of)#78])
   :        +- Exchange SinglePartition
   :           +- *(1) HashAggregate(keys=[], functions=[partial_max(date_of#76)], output=[max#119])
   :              +- LocalTableScan [date_of#76]
   +- *(1) FileScan parquet 
table[qbo_company_id#1L,date_of#38] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[s3location..., PartitionCount: 1836, PartitionFilters: [isnotnull(date_of#38)], PushedFilters: [], ReadSchema: struct<qbo_company_id:bigint>

推荐答案

以Ram的答案为基础,有一种更简单的方法可以实现此目的,即通过直接查询Hive元存储库而不是执行Spark-来消除大量开销SQL查询.无需重新发明轮子:

Building on Ram's answer, there is a much simpler way to accomplish this that eliminates a lot of overhead by querying the Hive metastore directly, rather than executing a Spark-SQL query. No need to reinvent the wheel:

import org.apache.hadoop.hive.conf.HiveConf
import scala.collection.JavaConverters._
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient

val hiveConf = new HiveConf(spark.sparkContext.hadoopConfiguration, classOf[HiveConf])
val cli = new HiveMetaStoreClient(hiveConf)
val maxPart = cli.listPartitions("<db_name>", "<tbl_name>", Short.MaxValue).asScala.map(_.getValues.asScala.mkString(",")).max

这篇关于Spark SubQuery扫描整个分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆