Spark 默认空列数据集 [英] Spark default null columns DataSet

查看:29
本文介绍了Spark 默认空列数据集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我无法让 Spark 将 json(或 csv )作为具有 Option[_] 字段的案例类的 Dataset 读取并非所有字段都在源代码中定义.

I cannot make Spark read a json (or csv for that matter) as Dataset of a case class with Option[_] fields where not all fields are defined in source.

这有点神秘,但假设我有一个名为 CustomData 的案例类

It's a bit cryptic, but let's say I have a case class called CustomData

给定以下 json 文件 (customA.json):

Given the following json file (customA.json):

{"id":123, "colA": "x", "colB": "z"}
{"id":456, "colA": "y"}
{"id":789,              "colB": "a"}

还有如下代码:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .master("local[2]")
  .appName("test")
  .getOrCreate()

import spark.implicits._

case class CustomData(id: BigInt, colA: Option[String], colB: Option[String])
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

val ds = spark
  .read
  .option("mode", "PERMISSIVE")
  .json("src/main/resources/customA.json")
  .as[CustomData]
  .show()

输出是 - 正如预期的那样 - :

The output is - as expected - :

+----+----+---+
|colA|colB| id|
+----+----+---+
|   x|   z|123|
|   y|null|456|
|null|   a|789|
+----+----+---+

即使并非总是定义所有列.但是,如果我想使用相同的代码来读取其中一列无处出现的文件,我将无法实现:

Even though not all columns are defined always. But if I want to use the same code for reading a file where one of the columns appears nowhere I cannot make it happen:

对于其他 json 文件(customB.json):

For the other json file (customB.json):

{"id":321, "colA": "x"}
{"id":654, "colA": "y"}
{"id":987}

以及附加代码:

  val ds2 = spark
  .read
  .option("mode", "PERMISSIVE")
  .json("src/main/resources/customB.json")
  .as[CustomData]
  .show()

输出错误:

org.apache.spark.sql.AnalysisException:无法解析colB"给定的输入列:[colA, id];

org.apache.spark.sql.AnalysisException: cannot resolve 'colB' given input columns: [colA, id];

这是有道理的,但我希望为两个文件重用相同的案例类.特别是如果我不知道 colB 在摄取之前是否出现在 json 文件中.

Which makes sense, but I'd love to reuse the same case class for both files. Especially if I don't know wether colB even appears in the json file before ingesting it.

我当然可以进行检查,但是有没有办法将不存在的列转换为 null(与 customA.json 一样).将 readmode 设置为 Permissive 似乎没有任何改变.

Of course I can make checks, but is there a way to convert non-existing columns to null (as with customA.json). Setting readmode to Permissive doesn't seem to change anything.

我错过了什么吗?

推荐答案

我会把一个答案放在这里.向您展示什么(某种)有效,但看起来非常 hacky 恕我直言.

I'll put an answer down here. To show you what (sort of) works, but looks very hacky IMHO.

通过使用一种方法来扩展 DataFrame,以将案例类的 StructType 强制放在已经存在的 StructType 之上,它实际上可以工作,但也许(我真的希望)有更好/更清洁的解决方案.

By extending the DataFrame with a method to force the StructType of a case class on top of the already existing StructType it actually works, but maybe (I really hope) there are better / cleaner solutions.

这里是:

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.ScalaReflection
import scala.reflect.runtime.universe._

case class DataFrameExtended(dataFrame: DataFrame) {

  def forceMergeSchema[T: TypeTag]: DataFrame = {
    ScalaReflection
      .schemaFor[T]
      .dataType
      .asInstanceOf[StructType]
      .filterNot(
        field => dataFrame.columns.contains(field.name)
      )
      .foldLeft(dataFrame){
        case (newDf, field) => newDf.withColumn(field.name, lit(null).cast(field.dataType))
      }
  }
}

implicit def dataFrameExtended(df: DataFrame): DataFrameExtended = {
  DataFrameExtended(df)
}

val ds2 = spark
  .read
  .option("mode", "PERMISSIVE")
  .json("src/main/resources/customB.json")
  .forceMergeSchema[CustomData]
  .as[CustomData]
  .show()

现在显示我希望的结果:

Now show a result I was hoping for:

+----+---+----+
|colA| id|colB|
+----+---+----+
|   x|321|null|
|   y|654|null|
|null|987|null|
+----+---+----+

我只对标量类型(如(Int、String 等))进行了尝试,我认为更复杂的结构会严重失败.所以我仍在寻找更好的答案.

I've tried this only with scalar types like (Int, String, etc) I think more complex structures will fail horribly. So I'm still looking for the better answer.

这篇关于Spark 默认空列数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆