Spark默认的空列DataSet [英] Spark default null columns DataSet
问题描述
我无法让Spark读取具有案例类的 Dataset
的 json
(或该问题的csv)作为 Option [_]
字段,其中并非所有字段都在源代码中定义。
这有点神秘,但可以说我有一个名为 CustomData
customA.json
): { id:123, colA : x, colB: z}
{ id:456, colA: y}
{ id:789, colB: a }
以及以下代码:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master( local [2])
.appName( test)
.getOrCreate()
import spark.implicits._
案例类CustomData (id:BigInt,colA:Option [String],colB:Option [String])
org.apache.spark.sql.catalyst.encoders.O uterScopes.addOuterScope(this)
val ds = spark
.read
.option( mode, PERMISSIVE)
.json( src / main /resources/customA.json)
.as [CustomData]
.show()
输出为-如预期-:
+ ---- + ---- +- +
| colA | colB | id |
+ ---- + ---- + --- +
| x | z | 123 |
| y | null | 456 |
|空| a | 789 |
+ ---- + ---- + --- +
即使并非总是定义所有列。
但是,如果我想使用相同的代码来读取其中一栏不显示的文件,我将无法实现:
对于其他json文件( customB.json
):
{ id:321, colA: x}
{ id:654, colA: y}
{ id:987}
以及附加代码:
val ds2 = spark
.read
.option( mode, PERMISSIVE)
.json( src / main / resources / customB .json)
.as [CustomData]
.show()
输出是一个错误:
org.apache.spark.sql.AnalysisException:无法解析' colB
'给定的输入列:[colA,id];
这很有意义,但我想为两个文件重用相同的case类。尤其是如果我不知道 colB
甚至在摄取前还出现在json文件中的话。
当然我可以进行检查,但是有一种方法可以将不存在的列转换为 null
(与 customA.json
一样) 。将readmode设置为 Permissive
似乎没有任何改变。
我错过了什么吗?
我将在此处给出 an 答案。为了向您展示什么(某种)有效,但是看上去非常恕我直言。
通过使用强制<$在已经存在的 StructType
之上的案例类的c $ c> StructType 确实有效,但也许(我真的希望)有更好的选择/更清洁的解决方案。
在这里:
导入org.apache.spark.sql.types.StructType
导入org.apache.spark.sql.DataFrame
导入org.apache.spark.sql.functions._
导入org.apache.spark.sql.catalyst.ScalaReflection
导入scala.reflect.runtime.universe._
案例类DataFrameExtended(dataFrame:DataFrame){
def forceMergeSchema [T:TypeTag]:DataFrame = {
ScalaReflection
.schemaFor [T]
.dataType
.asInstanceOf [StructType]
.filterNot(
字段=> dataFrame.columns.contains(field.name)
)
.foldLeft(dataFrame){
case(newDf,field)=> newDf.withColumn(field.name,lit(null).cast(field.dataType))
}
}
}
隐式def dataFrameExtended(df:DataFrame ):DataFrameExtended = {
DataFrameExtended(df)
}
val ds2 = spark
.read
.option( mode, PERMISSIVE )
.json( src / main / resources / customB.json)
.forceMergeSchema [CustomData]
.as [CustomData]
.show()
现在显示我希望得到的结果:
+ ---- + --- + ---- +
| colA | id | colB |
+ ---- + --- + ---- +
| x | 321 | null |
| y | 654 | null |
| null | 987 | null |
+ ---- + --- + ---- +
I'我仅使用标量类型(如Int,String等)尝试过此操作,我认为更复杂的结构将严重失败。因此,我仍在寻找更好的答案。
I cannot make Spark read a json
(or csv for that matter) as Dataset
of a case class with Option[_]
fields where not all fields are defined in source.
It's a bit cryptic, but let's say I have a case class called CustomData
Given the following json file (customA.json
):
{"id":123, "colA": "x", "colB": "z"}
{"id":456, "colA": "y"}
{"id":789, "colB": "a"}
And the following code:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local[2]")
.appName("test")
.getOrCreate()
import spark.implicits._
case class CustomData(id: BigInt, colA: Option[String], colB: Option[String])
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
val ds = spark
.read
.option("mode", "PERMISSIVE")
.json("src/main/resources/customA.json")
.as[CustomData]
.show()
The output is - as expected - :
+----+----+---+
|colA|colB| id|
+----+----+---+
| x| z|123|
| y|null|456|
|null| a|789|
+----+----+---+
Even though not all columns are defined always. But if I want to use the same code for reading a file where one of the columns appears nowhere I cannot make it happen:
For the other json file (customB.json
):
{"id":321, "colA": "x"}
{"id":654, "colA": "y"}
{"id":987}
And the additional code:
val ds2 = spark
.read
.option("mode", "PERMISSIVE")
.json("src/main/resources/customB.json")
.as[CustomData]
.show()
The output is an error:
org.apache.spark.sql.AnalysisException: cannot resolve 'colB
' given input columns: [colA, id];
Which makes sense, but I'd love to reuse the same case class for both files. Especially if I don't know wether colB
even appears in the json file before ingesting it.
Of course I can make checks, but is there a way to convert non-existing columns to null
(as with customA.json
). Setting readmode to Permissive
doesn't seem to change anything.
Am I missing something?
I'll put an answer down here. To show you what (sort of) works, but looks very hacky IMHO.
By extending the DataFrame with a method to force the StructType
of a case class on top of the already existing StructType
it actually works, but maybe (I really hope) there are better / cleaner solutions.
Here goes:
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.ScalaReflection
import scala.reflect.runtime.universe._
case class DataFrameExtended(dataFrame: DataFrame) {
def forceMergeSchema[T: TypeTag]: DataFrame = {
ScalaReflection
.schemaFor[T]
.dataType
.asInstanceOf[StructType]
.filterNot(
field => dataFrame.columns.contains(field.name)
)
.foldLeft(dataFrame){
case (newDf, field) => newDf.withColumn(field.name, lit(null).cast(field.dataType))
}
}
}
implicit def dataFrameExtended(df: DataFrame): DataFrameExtended = {
DataFrameExtended(df)
}
val ds2 = spark
.read
.option("mode", "PERMISSIVE")
.json("src/main/resources/customB.json")
.forceMergeSchema[CustomData]
.as[CustomData]
.show()
Now show a result I was hoping for:
+----+---+----+
|colA| id|colB|
+----+---+----+
| x|321|null|
| y|654|null|
|null|987|null|
+----+---+----+
I've tried this only with scalar types like (Int, String, etc) I think more complex structures will fail horribly. So I'm still looking for the better answer.
这篇关于Spark默认的空列DataSet的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!