Spark 默认空列数据集 [英] Spark default null columns DataSet
问题描述
我无法让 Spark 将 json
(或 csv )作为具有 Option[_]
字段的案例类的 Dataset
读取并非所有字段都在源代码中定义.
I cannot make Spark read a json
(or csv for that matter) as Dataset
of a case class with Option[_]
fields where not all fields are defined in source.
这有点神秘,但假设我有一个名为 CustomData
的案例类
It's a bit cryptic, but let's say I have a case class called CustomData
给定以下 json 文件 (customA.json
):
Given the following json file (customA.json
):
{"id":123, "colA": "x", "colB": "z"}
{"id":456, "colA": "y"}
{"id":789, "colB": "a"}
还有如下代码:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local[2]")
.appName("test")
.getOrCreate()
import spark.implicits._
case class CustomData(id: BigInt, colA: Option[String], colB: Option[String])
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
val ds = spark
.read
.option("mode", "PERMISSIVE")
.json("src/main/resources/customA.json")
.as[CustomData]
.show()
输出是 - 正如预期的那样 - :
The output is - as expected - :
+----+----+---+
|colA|colB| id|
+----+----+---+
| x| z|123|
| y|null|456|
|null| a|789|
+----+----+---+
即使并非总是定义所有列.但是,如果我想使用相同的代码来读取其中一列无处出现的文件,我将无法实现:
Even though not all columns are defined always. But if I want to use the same code for reading a file where one of the columns appears nowhere I cannot make it happen:
对于其他 json 文件(customB.json
):
For the other json file (customB.json
):
{"id":321, "colA": "x"}
{"id":654, "colA": "y"}
{"id":987}
以及附加代码:
val ds2 = spark
.read
.option("mode", "PERMISSIVE")
.json("src/main/resources/customB.json")
.as[CustomData]
.show()
输出错误:
org.apache.spark.sql.AnalysisException:无法解析colB
"给定的输入列:[colA, id];
org.apache.spark.sql.AnalysisException: cannot resolve 'colB
' given input columns: [colA, id];
这是有道理的,但我希望为两个文件重用相同的案例类.特别是如果我不知道 colB
在摄取之前是否出现在 json 文件中.
Which makes sense, but I'd love to reuse the same case class for both files. Especially if I don't know wether colB
even appears in the json file before ingesting it.
我当然可以进行检查,但是有没有办法将不存在的列转换为 null
(与 customA.json
一样).将 readmode 设置为 Permissive
似乎没有任何改变.
Of course I can make checks, but is there a way to convert non-existing columns to null
(as with customA.json
). Setting readmode to Permissive
doesn't seem to change anything.
我错过了什么吗?
推荐答案
我会把一个答案放在这里.向您展示什么(某种)有效,但看起来非常 hacky 恕我直言.
I'll put an answer down here. To show you what (sort of) works, but looks very hacky IMHO.
通过使用一种方法来扩展 DataFrame,以将案例类的 StructType
强制放在已经存在的 StructType
之上,它实际上可以工作,但也许(我真的希望)有更好/更清洁的解决方案.
By extending the DataFrame with a method to force the StructType
of a case class on top of the already existing StructType
it actually works, but maybe (I really hope) there are better / cleaner solutions.
这里是:
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.ScalaReflection
import scala.reflect.runtime.universe._
case class DataFrameExtended(dataFrame: DataFrame) {
def forceMergeSchema[T: TypeTag]: DataFrame = {
ScalaReflection
.schemaFor[T]
.dataType
.asInstanceOf[StructType]
.filterNot(
field => dataFrame.columns.contains(field.name)
)
.foldLeft(dataFrame){
case (newDf, field) => newDf.withColumn(field.name, lit(null).cast(field.dataType))
}
}
}
implicit def dataFrameExtended(df: DataFrame): DataFrameExtended = {
DataFrameExtended(df)
}
val ds2 = spark
.read
.option("mode", "PERMISSIVE")
.json("src/main/resources/customB.json")
.forceMergeSchema[CustomData]
.as[CustomData]
.show()
现在显示我希望的结果:
Now show a result I was hoping for:
+----+---+----+
|colA| id|colB|
+----+---+----+
| x|321|null|
| y|654|null|
|null|987|null|
+----+---+----+
我只对标量类型(如(Int、String 等))进行了尝试,我认为更复杂的结构会严重失败.所以我仍在寻找更好的答案.
I've tried this only with scalar types like (Int, String, etc) I think more complex structures will fail horribly. So I'm still looking for the better answer.
这篇关于Spark 默认空列数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!