Spark:强制读取模式时,Parquet DataFrame操作失败 [英] Spark: Parquet DataFrame operations fail when forcing schema on read

查看:118
本文介绍了Spark:强制读取模式时,Parquet DataFrame操作失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

(火花2.0.2)

当您有具有不同架构的镶木文件并在读取过程中强制架构时,这里的问题就会出现.即使可以打印架构并运行show() ok,也不能在缺少的列上应用任何过滤逻辑.

The problem here rises when you have parquet files with different schema and force the schema during read. Even though you can print the schema and run show() ok, you cannot apply any filtering logic on the missing columns.

以下是两个示例架构:

// assuming you are running this code in a spark REPL
import spark.implicits._

case class Foo(i: Int)
case class Bar(i: Int, j: Int) 

因此,Bar包含Foo的所有字段,并再添加一个(j).在现实生活中,当您从架构Foo开始,后来又决定需要更多字段并以架构Bar结尾时,就会出现这种情况.

So Bar includes all the fields of Foo and adds one more (j). In real-life this arises when you start with schema Foo and later decided that you needed more fields and end up with schema Bar.

让我们模拟两个不同的实木复合地板文件.

Let's simulate the two different parquet files.

// assuming you are on a Mac or Linux OS
spark.createDataFrame(Foo(1)::Nil).write.parquet("/tmp/foo")
spark.createDataFrame(Bar(1,2)::Nil).write.parquet("/tmp/bar")

我们这里想要的是始终使用更通用的架构Bar读取数据.也就是说,写在架构Foo上的行应具有j为空.

What we want here is to always read data using the more generic schema Bar. That is, rows written on schema Foo should have j to be null.

案例1:我们同时阅读了两种模式

case 1: We read a mix of both schema

spark.read.option("mergeSchema", "true").parquet("/tmp/foo", "/tmp/bar").show()
+---+----+
|  i|   j|
+---+----+
|  1|   2|
|  1|null|
+---+----+


spark.read.option("mergeSchema", "true").parquet("/tmp/foo", "/tmp/bar").filter($"j".isNotNull).show()
+---+---+
|  i|  j|
+---+---+
|  1|  2|
+---+---+

案例2:我们只有酒吧数据

case 2: We only have Bar data

spark.read.parquet("/tmp/bar").show()
+---+---+
|  i|  j|
+---+---+
|  1|  2|
+---+---+

案例3:我们只有Foo数据

case 3: We only have Foo data

scala> spark.read.parquet("/tmp/foo").show()
+---+
|  i|
+---+
|  1|
+---+

有问题的情况是3,其中我们得到的模式为Foo类型,而不是Bar类型.由于我们迁移到架构Bar,因此我们希望始终从数据(旧数据和新数据)中获取架构Bar.

The problematic case is 3, where our resulting schema is of type Foo and not of Bar. Since we migrate to schema Bar, we want to always get schema Bar from our data (old and new).

建议的解决方案是以编程方式将架构定义为始终为Bar.让我们看看如何做到这一点:

The suggested solution would be to define the schema programmatically to always be Bar. Let's see how to do this:

val barSchema = org.apache.spark.sql.Encoders.product[Bar].schema
//barSchema: org.apache.spark.sql.types.StructType = StructType(StructField(i,IntegerType,false), StructField(j,IntegerType,false)) 

运行show()效果很好:

Running show() works great:

scala> spark.read.schema(barSchema).parquet("/tmp/foo").show()
+---+----+
|  i|   j|
+---+----+
|  1|null|
+---+----+

但是,如果您尝试对缺少的列j进行过滤,那么事情将会失败.

However, if you try to filter on the missing column j, things fail.

scala> spark.read.schema(barSchema).parquet("/tmp/foo").filter($"j".isNotNull).show()
17/09/07 18:13:50 ERROR Executor: Exception in task 0.0 in stage 230.0 (TID 481)
java.lang.IllegalArgumentException: Column [j] was not found in schema!
    at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:181)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:169)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:151)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:91)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:58)
    at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:63)
    at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
    at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
    at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
    at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
    at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:381)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:355)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:168)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

推荐答案

对我有用的是将createDataFrame API与RDD[Row]和新模式结合使用(至少新列可为空).

What worked for me is to use the createDataFrame API with RDD[Row] and the new schema (which at least the new columns being nullable).

// Make the columns nullable (probably you don't need to make them all nullable)
val barSchemaNullable = org.apache.spark.sql.types.StructType(
   barSchema.map(_.copy(nullable = true)).toArray)

// We create the df (but this is not what you want to use, since it still has the same issue)
val df = spark.read.schema(barSchemaNullable).parquet("/tmp/foo")

// Here is the final API that give a working DataFrame
val fixedDf = spark.createDataFrame(df.rdd, barSchemaNullable)

fixedDf.filter($"j".isNotNull).show()

+---+---+
|  i|  j|
+---+---+
+---+---+

这篇关于Spark:强制读取模式时,Parquet DataFrame操作失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆