从UDF内的Spark SQL行提取嵌套数组 [英] Extract a nested array from a Spark SQL Row inside a UDF

查看:96
本文介绍了从UDF内的Spark SQL行提取嵌套数组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用DataFrames,需要提取数据.我有很多嵌套级别,所以我先进行了爆炸和选择操作,但是随后我将UDF用于嵌套级别.

I'm working with DataFrames and need to extract data. I have many nested levels, so I did the first level with explodes and selects but then I use UDFs for nested levels.

我有一个使用 $"Root.Obj" 的UDF,它是一个数组,我希望它返回一个Array [MyObj].
我的输出类:

I have an UDF taking $"Root.Obj", which is an array, and I want it to return an Array[MyObj].
My output classes :

case class MyObj(fieldA: Boolean, fieldB: String, fieldC: Array[MyNested])
case class MyNested(field1: Long, field2: String)

简而言之,这是输入模式:

This is the input schema in short :

 |-- Obj: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- FieldA: boolean (nullable = true)
 |    |    |-- FieldB: string (nullable = true)
 |    |    |-- FieldC: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- Field1: long (nullable = true)
 |    |    |    |    |-- Field2: string (nullable = true)
 |    |    |-- FieldD: boolean (nullable = true)

我的UDF:

def extractObjs: UserDefinedFunction = udf {
  (objs: Seq[Row]) ⇒
    objs.map {
      obj ⇒
        MyObj(
          obj.getAs[Boolean]("FieldA"),
          obj.getAs[String]("FieldB"),
          extractNested(obj.get???("FieldC"))
        )
    }
}

def extractNested(nesteds: ???): Array[MyNested] = {
  ???
}

这是更复杂的IRL,因为我需要从其他位置检索值,并且有更多的嵌套数组.而且,Obj和FieldC的输入结构比这里复杂得多,我无法(或不想)为它们创建案例类.因为我需要在多个地方进行此操作,所以说我不知道​​FieldC元素的结构".

This is more complicated IRL as I need to retrieve values from other places and there is more nested arrays. Moreover, the input structures of Obj and FieldC are far more complex than here and I can't (or don't want) create case class for them. As I'll need to do this in over places, let's say I don't know "the struct" of FieldC elements.

我的问题是提取"FieldC"数组.我想要一个Seq [Row],但是我无法实现,getStruct只给我一行,并且getSeq [Row]之后抛出错误,因为 scala.collection.mutable.WrappedArray $ ofRef无法转换到org.apache.spark.sql.Row .

My problem is to extract the "FieldC" array. I would like a Seq[Row] but I couldn't achieve to get that, getStruct give me just a Row, and getSeq[Row] is throwing error after because scala.collection.mutable.WrappedArray$ofRef cannot be cast to org.apache.spark.sql.Row.

推荐答案

结构映射到UDF中的 Row ,因此可以通过 Seq [Row] ,例如:

Structs map to Row in an UDF, so array of structs can be accessed by Seq[Row], such as :

def extractObjs: UserDefinedFunction = udf {
  (objs: Seq[Row]) ⇒
    objs.map {
      obj ⇒
        MyObj(
          obj.getAs[Boolean]("FieldA"),
          obj.getAs[String]("FieldB"),
          extractNested(obj.getAs[Seq[Row]]("FieldC"))
        )
    }
}

def extractNested(nesteds: Seq[Row]): Array[MyNested] = {
  nesteds.map(r => MyNested(r.getAs[Long]("Field1"),r.getAs[String]("Field2"))).toArray
}

这篇关于从UDF内的Spark SQL行提取嵌套数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆