Spark SubQuery 扫描整个分区 [英] Spark SubQuery scan whole partition
问题描述
我有一个按日期"字段分区的配置单元表我想写一个查询来从最新(最大)分区中获取数据.
I have a hive table which is partitioned by 'date' field i want to write a query to get the data from latest(max) partition.
spark.sql("select field from table where date_of = '2019-06-23'").explain(True)
vs
spark.sql("select filed from table where date_of = (select max(date_of) from table)").explain(True)
以下是两次查询的物理计划
Below are the Physical plan of the two query
*(1) Project [qbo_company_id#120L]
+- *(1) FileScan parquet
table[qbo_company_id#120L,date_of#157] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[s3location..., PartitionCount: 1, PartitionFilters: [isnotnull(date_of#157), (cast(date_of#157 as string) = 2019-06-23)], PushedFilters: [], ReadSchema: struct<qbo_company_id:bigint>
*(1) Project [qbo_company_id#1L]
+- *(1) Filter (date_of#38 = Subquery subquery0)
: +- Subquery subquery0
: +- *(2) HashAggregate(keys=[], functions=[max(date_of#76)], output=[max(date_of)#78])
: +- Exchange SinglePartition
: +- *(1) HashAggregate(keys=[], functions=[partial_max(date_of#76)], output=[max#119])
: +- LocalTableScan [date_of#76]
+- *(1) FileScan parquet
table[qbo_company_id#1L,date_of#38] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[s3location..., PartitionCount: 1836, PartitionFilters: [isnotnull(date_of#38)], PushedFilters: [], ReadSchema: struct<qbo_company_id:bigint>
为什么子查询会扫描整个分区而不是选择最新的?借助分区的元数据,为什么不能只扫描需要的分区?
Why is the subquery scanning the whole partition instead of choosing the latest one? With the help of metadata about partitions, why can it not scan only the required partition?
推荐答案
基于 Ram 的回答,有一种更简单的方法来实现这一点,通过直接查询 Hive 元存储来消除大量开销,而不是执行 Spark-SQL 查询.无需重新发明轮子:
Building on Ram's answer, there is a much simpler way to accomplish this that eliminates a lot of overhead by querying the Hive metastore directly, rather than executing a Spark-SQL query. No need to reinvent the wheel:
import org.apache.hadoop.hive.conf.HiveConf
import scala.collection.JavaConverters._
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient
val hiveConf = new HiveConf(spark.sparkContext.hadoopConfiguration, classOf[HiveConf])
val cli = new HiveMetaStoreClient(hiveConf)
val maxPart = cli.listPartitions("<db_name>", "<tbl_name>", Short.MaxValue).asScala.map(_.getValues.asScala.mkString(",")).max
这篇关于Spark SubQuery 扫描整个分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!