Spark 2 Dataset Null值异常 [英] Spark 2 Dataset Null value exception
问题描述
在spark Dataset.filter中获取此空错误
Getting this null error in spark Dataset.filter
输入CSV:
name,age,stat
abc,22,m
xyz,,s
工作代码:
case class Person(name: String, age: Long, stat: String)
val peopleDS = spark.read.option("inferSchema","true")
.option("header", "true").option("delimiter", ",")
.csv("./people.csv").as[Person]
peopleDS.show()
peopleDS.createOrReplaceTempView("people")
spark.sql("select * from people where age > 30").show()
失败代码(添加以下行将返回错误):
val filteredDS = peopleDS.filter(_.age > 30)
filteredDS.show()
返回空错误
java.lang.RuntimeException: Null value appeared in non-nullable field:
- field (class: "scala.Long", name: "age")
- root class: "com.gcp.model.Person"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
推荐答案
您得到的异常应该解释所有内容,但让我们逐步进行:
Exception you get should explain everything but let's go step-by-step:
-
当使用
csv
数据源加载数据时,所有字段都标记为nullable
:
When load data using
csv
data source all fields are marked asnullable
:
val path: String = ???
val peopleDF = spark.read
.option("inferSchema","true")
.option("header", "true")
.option("delimiter", ",")
.csv(path)
peopleDF.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- stat: string (nullable = true)
缺少字段表示为SQL NULL
peopleDF.where($"age".isNull).show
+----+----+----+
|name| age|stat|
+----+----+----+
| xyz|null| s|
+----+----+----+
接下来,您将Dataset[Row]
转换为Dataset[Person]
,后者使用Long
对age
字段进行编码. Scala中的Long
不能为null
.由于输入模式为nullable
,因此尽管如此,输出模式仍保持为nullable
:
Next you convert Dataset[Row]
to Dataset[Person]
which uses Long
to encode age
field. Long
in Scala cannot be null
. Because input schema is nullable
, output schema stays nullable
despite of that:
val peopleDS = peopleDF.as[Person]
peopleDS.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- stat: string (nullable = true)
请注意,它as[T]
完全不影响架构.
Note that it as[T]
doesn't affect the schema at all.
使用SQL(在已注册表上)或DataFrame
API查询Dataset
时,Spark不会反序列化对象.由于模式仍然是nullable
,我们可以执行:
When you query Dataset
using SQL (on registered table) or DataFrame
API Spark won't deserialize the object. Since schema is still nullable
we can execute:
peopleDS.where($"age" > 30).show
+----+---+----+
|name|age|stat|
+----+---+----+
+----+---+----+
没有任何问题.这只是普通的SQL逻辑,NULL
是有效值.
without any issues. This is just a plain SQL logic and NULL
is a valid value.
当我们使用静态类型的Dataset
API时:
When we use statically typed Dataset
API:
peopleDS.filter(_.age > 30)
Spark必须反序列化对象.由于Long
不能为null
(SQL NULL
),因此它会失败,并出现您所看到的异常.
Spark has to deserialize the object. Because Long
cannot be null
(SQL NULL
) it fails with exception you've seen.
如果不是那样的话,您将获得NPE.
If it wasn't for that you'd get NPE.
正确的数据静态类型表示应使用Optional
类型:
Correct statically typed representation of your data should use Optional
types:
case class Person(name: String, age: Option[Long], stat: String)
具有已调整的过滤器功能:
with adjusted filter function:
peopleDS.filter(_.age.map(_ > 30).getOrElse(false))
+----+---+----+
|name|age|stat|
+----+---+----+
+----+---+----+
如果您愿意,可以使用模式匹配:
If you prefer you can use pattern matching:
peopleDS.filter {
case Some(age) => age > 30
case _ => false // or case None => false
}
请注意,您不必(但仍建议这样做)为name
和stat
使用可选类型.由于Scala String
只是Java String
,因此可以为null
.当然,如果采用这种方法,则必须明确检查访问的值是否为null
.
Note that you don't have to (but it would be recommended anyway) to use optional types for name
and stat
. Because Scala String
is just a Java String
it can be null
. Of course if you go with this approach you have to explicitly check if accessed values are null
or not.
相关 Spark 2.0 Dataset vs DataFrame
这篇关于Spark 2 Dataset Null值异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!