Spark:强制读取模式时,Parquet DataFrame操作失败 [英] Spark: Parquet DataFrame operations fail when forcing schema on read
问题描述
(火花2.0.2)
当您有具有不同架构的镶木文件并在读取过程中强制架构时,这里的问题就会出现.即使可以打印架构并运行show()
ok,也不能在缺少的列上应用任何过滤逻辑.
The problem here rises when you have parquet files with different schema and force the schema during read. Even though you can print the schema and run show()
ok, you cannot apply any filtering logic on the missing columns.
以下是两个示例架构:
// assuming you are running this code in a spark REPL
import spark.implicits._
case class Foo(i: Int)
case class Bar(i: Int, j: Int)
因此,Bar
包含Foo
的所有字段,并再添加一个(j
).在现实生活中,当您从架构Foo
开始,后来又决定需要更多字段并以架构Bar
结尾时,就会出现这种情况.
So Bar
includes all the fields of Foo
and adds one more (j
). In real-life this arises when you start with schema Foo
and later decided that you needed more fields and end up with schema Bar
.
让我们模拟两个不同的实木复合地板文件.
Let's simulate the two different parquet files.
// assuming you are on a Mac or Linux OS
spark.createDataFrame(Foo(1)::Nil).write.parquet("/tmp/foo")
spark.createDataFrame(Bar(1,2)::Nil).write.parquet("/tmp/bar")
我们这里想要的是始终使用更通用的架构Bar
读取数据.也就是说,写在架构Foo
上的行应具有j
为空.
What we want here is to always read data using the more generic schema Bar
. That is, rows written on schema Foo
should have j
to be null.
案例1:我们同时阅读了两种模式
case 1: We read a mix of both schema
spark.read.option("mergeSchema", "true").parquet("/tmp/foo", "/tmp/bar").show()
+---+----+
| i| j|
+---+----+
| 1| 2|
| 1|null|
+---+----+
spark.read.option("mergeSchema", "true").parquet("/tmp/foo", "/tmp/bar").filter($"j".isNotNull).show()
+---+---+
| i| j|
+---+---+
| 1| 2|
+---+---+
案例2:我们只有酒吧数据
case 2: We only have Bar data
spark.read.parquet("/tmp/bar").show()
+---+---+
| i| j|
+---+---+
| 1| 2|
+---+---+
案例3:我们只有Foo数据
case 3: We only have Foo data
scala> spark.read.parquet("/tmp/foo").show()
+---+
| i|
+---+
| 1|
+---+
有问题的情况是3,其中我们得到的模式为Foo
类型,而不是Bar
类型.由于我们迁移到架构Bar
,因此我们希望始终从数据(旧数据和新数据)中获取架构Bar
.
The problematic case is 3, where our resulting schema is of type Foo
and not of Bar
. Since we migrate to schema Bar
, we want to always get schema Bar
from our data (old and new).
建议的解决方案是以编程方式将架构定义为始终为Bar
.让我们看看如何做到这一点:
The suggested solution would be to define the schema programmatically to always be Bar
. Let's see how to do this:
val barSchema = org.apache.spark.sql.Encoders.product[Bar].schema
//barSchema: org.apache.spark.sql.types.StructType = StructType(StructField(i,IntegerType,false), StructField(j,IntegerType,false))
运行show()效果很好:
Running show() works great:
scala> spark.read.schema(barSchema).parquet("/tmp/foo").show()
+---+----+
| i| j|
+---+----+
| 1|null|
+---+----+
但是,如果您尝试对缺少的列j进行过滤,那么事情将会失败.
However, if you try to filter on the missing column j, things fail.
scala> spark.read.schema(barSchema).parquet("/tmp/foo").filter($"j".isNotNull).show()
17/09/07 18:13:50 ERROR Executor: Exception in task 0.0 in stage 230.0 (TID 481)
java.lang.IllegalArgumentException: Column [j] was not found in schema!
at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:181)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:169)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:151)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:91)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:58)
at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:63)
at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:381)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:355)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:168)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
推荐答案
对我有用的是将createDataFrame
API与RDD[Row]
和新模式结合使用(至少新列可为空).>
What worked for me is to use the createDataFrame
API with RDD[Row]
and the new schema (which at least the new columns being nullable).
// Make the columns nullable (probably you don't need to make them all nullable)
val barSchemaNullable = org.apache.spark.sql.types.StructType(
barSchema.map(_.copy(nullable = true)).toArray)
// We create the df (but this is not what you want to use, since it still has the same issue)
val df = spark.read.schema(barSchemaNullable).parquet("/tmp/foo")
// Here is the final API that give a working DataFrame
val fixedDf = spark.createDataFrame(df.rdd, barSchemaNullable)
fixedDf.filter($"j".isNotNull).show()
+---+---+
| i| j|
+---+---+
+---+---+
这篇关于Spark:强制读取模式时,Parquet DataFrame操作失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!