为什么 Apache Spark 读取嵌套结构中不必要的 Parquet 列? [英] Why does Apache Spark read unnecessary Parquet columns within nested structures?
问题描述
我的团队正在构建一个 ETL 过程,以使用 Spark 将原始分隔文本文件加载到基于 Parquet 的数据湖"中.Parquet 列存储的承诺之一是查询只会读取必要的列条纹".
My team is building an ETL process to load raw delimited text files into a Parquet based "data lake" using Spark. One of the promises of the Parquet column store is that a query will only read the necessary "column stripes".
但是我们看到为嵌套架构结构读取了意外的列.
But we're seeing unexpected columns being read for nested schema structures.
为了演示,这里是一个使用 Scala 和 Spark 2.0.1 shell 的 POC:
To demonstrate, here is a POC using Scala and the Spark 2.0.1 shell:
// Preliminary setup
sc.setLogLevel("INFO")
import org.apache.spark.sql.types._
import org.apache.spark.sql._
// Create a schema with nested complex structures
val schema = StructType(Seq(
StructField("F1", IntegerType),
StructField("F2", IntegerType),
StructField("Orig", StructType(Seq(
StructField("F1", StringType),
StructField("F2", StringType))))))
// Create some sample data
val data = spark.createDataFrame(
sc.parallelize(Seq(
Row(1, 2, Row("1", "2")),
Row(3, null, Row("3", "ABC")))),
schema)
// Save it
data.write.mode(SaveMode.Overwrite).parquet("data.parquet")
然后我们将文件读回 DataFrame 并投影到列的子集:
Then we read the file back into a DataFrame and project to a subset of columns:
// Read it back into another DataFrame
val df = spark.read.parquet("data.parquet")
// Select & show a subset of the columns
df.select($"F1", $"Orig.F1").show
运行时,我们会看到预期的输出:
When this runs we see the expected output:
+---+-------+
| F1|Orig_F1|
+---+-------+
| 1| 1|
| 3| 3|
+---+-------+
但是......查询计划显示了一个稍微不同的故事:
But... the query plan shows a slightly different story:
优化方案"显示:
val projected = df.select($"F1", $"Orig.F1".as("Orig_F1"))
projected.queryExecution.optimizedPlan
// Project [F1#18, Orig#20.F1 AS Orig_F1#116]
// +- Relation[F1#18,F2#19,Orig#20] parquet
和解释"显示:
projected.explain
// == Physical Plan ==
// *Project [F1#18, Orig#20.F1 AS Orig_F1#116]
// +- *Scan parquet [F1#18,Orig#20] Format: ParquetFormat, InputPaths: hdfs://sandbox.hortonworks.com:8020/user/stephenp/data.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<F1:int,Orig:struct<F1:string,F2:string>>
并且在执行过程中产生的INFO日志也证实了Orig.F2列被意外读取:
And the INFO logs produced during execution also confirm that the Orig.F2 column is unexpectedly read:
16/10/21 15:13:15 INFO parquet.ParquetReadSupport: Going to read the following fields from the Parquet file:
Parquet form:
message spark_schema {
optional int32 F1;
optional group Orig {
optional binary F1 (UTF8);
optional binary F2 (UTF8);
}
}
Catalyst form:
StructType(StructField(F1,IntegerType,true), StructField(Orig,StructType(StructField(F1,StringType,true), StructField(F2,StringType,true)),true))
根据 Dremel 论文 和 Parquet 文档,复杂嵌套结构的列应该独立存储和独立检索.
According to the Dremel paper and the Parquet documentation, columns for complex nested structures should be independently stored and independently retrievable.
问题:
- 这种行为是否是当前 Spark 查询引擎的限制?换句话说,Parquet 是否支持以最佳方式执行此查询,但 Spark 的查询计划器很幼稚?
- 或者,这是当前 Parquet 实现的限制吗?
- 或者,我是否没有正确使用 Spark API?
- 或者,我是否误解了 Dremel/Parquet 列存储的工作原理?
可能相关:为什么查询性能与 Spark SQL 中的嵌套列有何不同?
推荐答案
目前是 Spark 查询引擎的限制,相关 JIRA 票如下,spark 只处理 Parquet 中简单类型的谓词下推,不处理嵌套的 StructTypes
It's a limitation on the Spark query engine at the moment, the relevant JIRA ticket is below, spark only handles predicate pushdown of simple types in Parquet, not nested StructTypes
https://issues.apache.org/jira/browse/SPARK-17636
这篇关于为什么 Apache Spark 读取嵌套结构中不必要的 Parquet 列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!