为什么 Apache Spark 读取嵌套结构中不必要的 Parquet 列? [英] Why does Apache Spark read unnecessary Parquet columns within nested structures?

查看:26
本文介绍了为什么 Apache Spark 读取嵌套结构中不必要的 Parquet 列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的团队正在构建一个 ETL 过程,以使用 Spark 将原始分隔文本文件加载到基于 Parquet 的数据湖"中.Parquet 列存储的承诺之一是查询只会读取必要的列条纹".

My team is building an ETL process to load raw delimited text files into a Parquet based "data lake" using Spark. One of the promises of the Parquet column store is that a query will only read the necessary "column stripes".

但是我们看到为嵌套架构结构读取了意外的列.

But we're seeing unexpected columns being read for nested schema structures.

为了演示,这里是一个使用 Scala 和 Spark 2.0.1 shell 的 POC:

To demonstrate, here is a POC using Scala and the Spark 2.0.1 shell:

// Preliminary setup
sc.setLogLevel("INFO")
import org.apache.spark.sql.types._
import org.apache.spark.sql._

// Create a schema with nested complex structures
val schema = StructType(Seq(
    StructField("F1", IntegerType),
    StructField("F2", IntegerType),
    StructField("Orig", StructType(Seq(
        StructField("F1", StringType),
        StructField("F2", StringType))))))

// Create some sample data
val data = spark.createDataFrame(
    sc.parallelize(Seq(
        Row(1, 2, Row("1", "2")),
        Row(3, null, Row("3", "ABC")))),
    schema)

// Save it
data.write.mode(SaveMode.Overwrite).parquet("data.parquet")

然后我们将文件读回 DataFrame 并投影到列的子集:

Then we read the file back into a DataFrame and project to a subset of columns:

// Read it back into another DataFrame
val df = spark.read.parquet("data.parquet")

// Select & show a subset of the columns
df.select($"F1", $"Orig.F1").show

运行时,我们会看到预期的输出:

When this runs we see the expected output:

+---+-------+
| F1|Orig_F1|
+---+-------+
|  1|      1|
|  3|      3|
+---+-------+

但是......查询计划显示了一个稍微不同的故事:

But... the query plan shows a slightly different story:

优化方案"显示:

val projected = df.select($"F1", $"Orig.F1".as("Orig_F1"))
projected.queryExecution.optimizedPlan
// Project [F1#18, Orig#20.F1 AS Orig_F1#116]
// +- Relation[F1#18,F2#19,Orig#20] parquet

和解释"显示:

projected.explain
// == Physical Plan ==
// *Project [F1#18, Orig#20.F1 AS Orig_F1#116]
// +- *Scan parquet [F1#18,Orig#20] Format: ParquetFormat, InputPaths: hdfs://sandbox.hortonworks.com:8020/user/stephenp/data.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<F1:int,Orig:struct<F1:string,F2:string>>

并且在执行过程中产生的INFO日志也证实了Orig.F2列被意外读取:

And the INFO logs produced during execution also confirm that the Orig.F2 column is unexpectedly read:

16/10/21 15:13:15 INFO parquet.ParquetReadSupport: Going to read the following fields from the Parquet file:

Parquet form:
message spark_schema {
  optional int32 F1;
  optional group Orig {
    optional binary F1 (UTF8);
    optional binary F2 (UTF8);
  }
}

Catalyst form:
StructType(StructField(F1,IntegerType,true), StructField(Orig,StructType(StructField(F1,StringType,true), StructField(F2,StringType,true)),true))

根据 Dremel 论文Parquet 文档,复杂嵌套结构的列应该独立存储和独立检索.

According to the Dremel paper and the Parquet documentation, columns for complex nested structures should be independently stored and independently retrievable.

问题:

  1. 这种行为是否是当前 Spark 查询引擎的限制?换句话说,Parquet 是否支持以最佳方式执行此查询,但 Spark 的查询计划器很幼稚?
  2. 或者,这是当前 Parquet 实现的限制吗?
  3. 或者,我是否没有正确使用 Spark API?
  4. 或者,我是否误解了 Dremel/Parquet 列存储的工作原理?

可能相关:为什么查询性能与 Spark SQL 中的嵌套列有何不同?

推荐答案

目前是 Spark 查询引擎的限制,相关 JIRA 票如下,spark 只处理 Parquet 中简单类型的谓词下推,不处理嵌套的 StructTypes

It's a limitation on the Spark query engine at the moment, the relevant JIRA ticket is below, spark only handles predicate pushdown of simple types in Parquet, not nested StructTypes

https://issues.apache.org/jira/browse/SPARK-17636

这篇关于为什么 Apache Spark 读取嵌套结构中不必要的 Parquet 列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆