Spark默认的空列DataSet [英] Spark default null columns DataSet

查看:348
本文介绍了Spark默认的空列DataSet的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我无法让Spark读取具有案例类的 Dataset json (或该问题的csv)作为 Option [_] 字段,其中并非所有字段都在源代码中定义。



这有点神秘,但可以说我有一个名为 CustomData

的案例类,给出了以下json文件( customA.json ):

  { id:123, colA : x, colB: z} 
{ id:456, colA: y}
{ id:789, colB: a }

以及以下代码:

  import org.apache.spark.sql.SparkSession 

val spark = SparkSession.builder()
.master( local [2])
.appName( test)
.getOrCreate()

import spark.implicits._

案例类CustomData (id:BigInt,colA:Option [String],colB:Option [String])
org.apache.spark.sql.catalyst.encoders.O uterScopes.addOuterScope(this)

val ds = spark
.read
.option( mode, PERMISSIVE)
.json( src / main /resources/customA.json)
.as [CustomData]
.show()

输出为-如预期-:

  + ---- + ---- +- + 
| colA | colB | id |
+ ---- + ---- + --- +
| x | z | 123 |
| y | null | 456 |
|空| a | 789 |
+ ---- + ---- + --- +

即使并非总是定义所有列。
但是,如果我想使用相同的代码来读取其中一栏不显示的文件,我将无法实现:



对于其他json文件( customB.json ):

  { id:321, colA: x} 
{ id:654, colA: y}
{ id:987}

以及附加代码:

  val ds2 = spark 
.read
.option( mode, PERMISSIVE)
.json( src / main / resources / customB .json)
.as [CustomData]
.show()

输出是一个错误:



org.apache.spark.sql.AnalysisException:无法解析' colB '给定的输入列:[colA,id];



这很有意义,但我想为两个文件重用相同的case类。尤其是如果我不知道 colB 甚至在摄取前还出现在json文件中的话。



当然我可以进行检查,但是有一种方法可以将不存在的列转换为 null (与 customA.json 一样) 。将readmode设置为 Permissive 似乎没有任何改变。



我错过了什么吗?

解决方案

我将在此处给出 an 答案。为了向您展示什么(某种)有效,但是看上去非常恕我直言。



通过使用强制<$在已经存在的 StructType 之上的案例类的c $ c> StructType 确实有效,但也许(我真的希望)有更好的选择/更清洁的解决方案。



在这里:

 导入org.apache.spark.sql.types.StructType 
导入org.apache.spark.sql.DataFrame
导入org.apache.spark.sql.functions._
导入org.apache.spark.sql.catalyst.ScalaReflection
导入scala.reflect.runtime.universe._

案例类DataFrameExtended(dataFrame:DataFrame){

def forceMergeSchema [T:TypeTag]:DataFrame = {
ScalaReflection
.schemaFor [T]
.dataType
.asInstanceOf [StructType]
.filterNot(
字段=> dataFrame.columns.contains(field.name)

.foldLeft(dataFrame){
case(newDf,field)=> newDf.withColumn(field.name,lit(null).cast(field.dataType))
}
}
}

隐式def dataFrameExtended(df:DataFrame ):DataFrameExtended = {
DataFrameExtended(df)
}

val ds2 = spark
.read
.option( mode, PERMISSIVE )
.json( src / main / resources / customB.json)
.forceMergeSchema [CustomData]
.as [CustomData]
.show()

现在显示我希望得到的结果:

  + ---- + --- + ---- + 
| colA | id | colB |
+ ---- + --- + ---- +
| x | 321 | null |
| y | 654 | null |
| null | 987 | null |
+ ---- + --- + ---- +

I'我仅使用标量类型(如Int,String等)尝试过此操作,我认为更复杂的结构将严重失败。因此,我仍在寻找更好的答案。


I cannot make Spark read a json (or csv for that matter) as Dataset of a case class with Option[_] fields where not all fields are defined in source.

It's a bit cryptic, but let's say I have a case class called CustomData

Given the following json file (customA.json):

{"id":123, "colA": "x", "colB": "z"}
{"id":456, "colA": "y"}
{"id":789,              "colB": "a"}

And the following code:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .master("local[2]")
  .appName("test")
  .getOrCreate()

import spark.implicits._

case class CustomData(id: BigInt, colA: Option[String], colB: Option[String])
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

val ds = spark
  .read
  .option("mode", "PERMISSIVE")
  .json("src/main/resources/customA.json")
  .as[CustomData]
  .show()

The output is - as expected - :

+----+----+---+
|colA|colB| id|
+----+----+---+
|   x|   z|123|
|   y|null|456|
|null|   a|789|
+----+----+---+

Even though not all columns are defined always. But if I want to use the same code for reading a file where one of the columns appears nowhere I cannot make it happen:

For the other json file (customB.json):

{"id":321, "colA": "x"}
{"id":654, "colA": "y"}
{"id":987}

And the additional code:

  val ds2 = spark
  .read
  .option("mode", "PERMISSIVE")
  .json("src/main/resources/customB.json")
  .as[CustomData]
  .show()

The output is an error:

org.apache.spark.sql.AnalysisException: cannot resolve 'colB' given input columns: [colA, id];

Which makes sense, but I'd love to reuse the same case class for both files. Especially if I don't know wether colB even appears in the json file before ingesting it.

Of course I can make checks, but is there a way to convert non-existing columns to null (as with customA.json). Setting readmode to Permissive doesn't seem to change anything.

Am I missing something?

解决方案

I'll put an answer down here. To show you what (sort of) works, but looks very hacky IMHO.

By extending the DataFrame with a method to force the StructType of a case class on top of the already existing StructType it actually works, but maybe (I really hope) there are better / cleaner solutions.

Here goes:

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.ScalaReflection
import scala.reflect.runtime.universe._

case class DataFrameExtended(dataFrame: DataFrame) {

  def forceMergeSchema[T: TypeTag]: DataFrame = {
    ScalaReflection
      .schemaFor[T]
      .dataType
      .asInstanceOf[StructType]
      .filterNot(
        field => dataFrame.columns.contains(field.name)
      )
      .foldLeft(dataFrame){
        case (newDf, field) => newDf.withColumn(field.name, lit(null).cast(field.dataType))
      }
  }
}

implicit def dataFrameExtended(df: DataFrame): DataFrameExtended = {
  DataFrameExtended(df)
}

val ds2 = spark
  .read
  .option("mode", "PERMISSIVE")
  .json("src/main/resources/customB.json")
  .forceMergeSchema[CustomData]
  .as[CustomData]
  .show()

Now show a result I was hoping for:

+----+---+----+
|colA| id|colB|
+----+---+----+
|   x|321|null|
|   y|654|null|
|null|987|null|
+----+---+----+

I've tried this only with scalar types like (Int, String, etc) I think more complex structures will fail horribly. So I'm still looking for the better answer.

这篇关于Spark默认的空列DataSet的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆