星火查询SQL数据框与复杂类型 [英] Querying Spark SQL DataFrame with complex types

查看:269
本文介绍了星火查询SQL数据框与复杂类型的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我如何查询复杂的类型,如地图/阵列的RDD?
例如,当我在写这个测试code:

How Can I query an RDD with complex types such as maps/arrays? for example, when I was writing this test code:

case class Test(name: String, map: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))

我虽然语法会是这样的:

I though the syntax would be something like:

sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")

sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")

但我得到

不能在类型的MapType访问嵌套的字段(StringType,StringType,真实)

Can't access nested field in type MapType(StringType,StringType,true)

org.apache.spark.sql.catalyst.errors.package $ TreeNodeException:未解决的属性。

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes

分别。

推荐答案

这取决于列的类型。让我们先从一些虚拟数据:

It depends on a type of the column. Lets start with some dummy data:

import org.apache.spark.sql.functions.{udf, lit}
import scala.util.Try

case class SubRecord(x: Int)
case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
case class Record(
  an_array: Array[Int], a_map: Map[String, String], 
  a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])


val df = sc.parallelize(Seq(
  Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
         Array(
           ArrayElement("foo", 1, Array(1.0, 2.0)),
           ArrayElement("bar", 2, Array(3.0, 4.0)))),
  Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
         Array(ArrayElement("foz", 3, Array(5.0, 6.0)), 
               ArrayElement("baz", 4, Array(7.0, 8.0))))
)).toDF

df.registerTempTable("df")
df.printSchema

// root
// |-- an_array: array (nullable = true)
// |    |-- element: integer (containsNull = false)
// |-- a_map: map (nullable = true)
// |    |-- key: string
// |    |-- value: string (valueContainsNull = true)
// |-- a_struct: struct (nullable = true)
// |    |-- x: integer (nullable = false)
// |-- an_array_of_structs: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- foo: string (nullable = true)
// |    |    |-- bar: integer (nullable = false)
// |    |    |-- vals: array (nullable = true)
// |    |    |    |-- element: double (containsNull = false)


  • 数组列:

    • array columns:


      • Col​​umn.getItem

      df.select($"an_array".getItem(1)).show
      
      // +-----------+
      // |an_array[1]|
      // +-----------+
      // |          2|
      // |          5|
      // +-----------+
      


    • 蜂巢括号语法:

    • Hive brackets syntax:

      sqlContext.sql("SELECT an_array[1] FROM df").show
      
      // +---+
      // |_c0|
      // +---+
      // |  2|
      // |  5|
      // +---+
      


    • 这是UDF

    • an UDF

      val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption)
      
      df.select(get_ith($"an_array", lit(1))).show
      
      // +---------------+
      // |UDF(an_array,1)|
      // +---------------+
      // |              2|
      // |              5|
      // +---------------+
      


    • 图列


      • 使用 Col​​umn.getField 方法:

      df.select($"a_map".getField("foo")).show
      
      // +----------+
      // |a_map[foo]|
      // +----------+
      // |       bar|
      // |      null|
      // +----------+
      


    • 使用蜂巢括号语法:

    • using Hive brackets syntax:

      sqlContext.sql("SELECT a_map['foz'] FROM df").show
      
      // +----+
      // | _c0|
      // +----+
      // |null|
      // | baz|
      // +----+
      


    • 使用带有点语法完整路径:

    • using a full path with dot syntax:

      df.select($"a_map.foo").show
      
      // +----+
      // | foo|
      // +----+
      // | bar|
      // |null|
      // +----+
      


    • 使用UDF

    • using an UDF

      val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k))
      
      df.select(get_field($"a_map", lit("foo"))).show
      
      // +--------------+
      // |UDF(a_map,foo)|
      // +--------------+
      // |           bar|
      // |          null|
      // +--------------+
      


    • 使用带有点语法完整路径结构列:

      struct columns using full path with dot syntax:


      • 通过数据帧API

      • with DataFrame API

      df.select($"a_struct.x").show
      
      // +---+
      // |  x|
      // +---+
      // |  1|
      // |  2|
      // +---+
      


    • 与原始SQL

    • with raw SQL

      sqlContext.sql("SELECT a_struct.x FROM df").show
      
      // +---+
      // |  x|
      // +---+
      // |  1|
      // |  2|
      // +---+
      


    • 结构的数组字段可以使用点语法,名称和标准方法访问

      fields inside array of structs can be accessed using dot-syntax, names and standard Column methods:

      df.select($"an_array_of_structs.foo").show
      
      // +----------+
      // |       foo|
      // +----------+
      // |[foo, bar]|
      // |[foz, baz]|
      // +----------+
      
      sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
      
      // +---+
      // |_c0|
      // +---+
      // |foo|
      // |foz|
      // +---+
      
      df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show
      
      // +------------------------------+
      // |an_array_of_structs.vals[1][1]|
      // +------------------------------+
      // |                           4.0|
      // |                           8.0|
      // +------------------------------+
      


    • 用户定义类型(UDT)字段可以使用UDF的访问。请参见 SparkSQL引用UDT的详情属性。

      备注


      • 取决于星火版本的一些方法可以只用 HiveContext 。 UDF的应该工作的独立版本,采用标准的 SQLContext HiveContext

      • 一般来说嵌套的值是二等公民。并非所有的典型操作支持嵌套的领域。根据上下文它可以更好地扁平化的架构和/或爆炸集合

      • depending on a Spark version some of these methods can be available only with HiveContext. UDFs should work independent of version with both standard SQLContext and HiveContext.
      • generally speaking nested values are a second class citizens. Not all typical operations are supported on nested fields. Depending on a context it could be better to flatten the schema and / or explode collections

      df.select(explode($"an_array_of_structs")).show
      
      // +--------------------+
      // |                 col|
      // +--------------------+
      // |[foo,1,WrappedArr...|
      // |[bar,2,WrappedArr...|
      // |[foz,3,WrappedArr...|
      // |[baz,4,WrappedArr...|
      // +--------------------+
      


    • 这篇关于星火查询SQL数据框与复杂类型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆