仅使用Spark-Scala RDD功能将JSON扁平化为表格结构 [英] Flattening JSON into Tabular Structure using Spark-Scala RDD only fucntion

查看:111
本文介绍了仅使用Spark-Scala RDD功能将JSON扁平化为表格结构的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有嵌套的JSON,并且喜欢以表格结构输出.我能够单独解析JSON值,但在将其制成表格时存在一些问题.我能够通过数据框轻松地做到这一点.但是我想使用仅RDD"功能.任何帮助表示赞赏.

I have nested JSON and like to have output in tabular structure. I am able to parse the JSON values individually , but having some problems in tabularizing it. I am able to do it via dataframe easily. But I want do it using "RDD ONLY " functions. Any help much appreciated.

输入JSON:

  { "level":{"productReference":{  

     "prodID":"1234",

     "unitOfMeasure":"EA"

  },

  "states":[  
     {  
        "state":"SELL",
        "effectiveDateTime":"2015-10-09T00:55:23.6345Z",
        "stockQuantity":{  
           "quantity":1400.0,
           "stockKeepingLevel":"A"
        }
     },
     {  
        "state":"HELD",
        "effectiveDateTime":"2015-10-09T00:55:23.6345Z",
        "stockQuantity":{  
           "quantity":800.0,
           "stockKeepingLevel":"B"
        }
     }
  ] }}

预期输出:

我尝试了以下Spark代码.但是,获得像这样的输出和Row()对象无法解析它.

I tried Below Spark code . But getting output like this and Row() object is not able to parse this.

079562193,EA,列表(SELLABLE,HELD),列表(2015-10-09T00:55:23.6345Z,2015-10-09T00:55:23.6345Z),列表(1400.0,800.0),列表(SINGLE,单)

079562193,EA,List(SELLABLE, HELD),List(2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z),List(1400.0, 800.0),List(SINGLE, SINGLE)

def main(Args : Array[String]): Unit = {

  val conf = new SparkConf().setAppName("JSON Read and Write using Spark RDD").setMaster("local[1]")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)

  val salesSchema = StructType(Array(
    StructField("prodID", StringType, true),
    StructField("unitOfMeasure", StringType, true),
    StructField("state", StringType, true),
    StructField("effectiveDateTime", StringType, true),
    StructField("quantity", StringType, true),
    StructField("stockKeepingLevel", StringType, true)
  ))

  val ReadAlljsonMessageInFile_RDD = sc.textFile("product_rdd.json")

  val x = ReadAlljsonMessageInFile_RDD.map(eachJsonMessages => {

        parse(eachJsonMessages)

      }).map(insideEachJson=>{
        implicit  val formats = org.json4s.DefaultFormats

       val prodID = (insideEachJson\ "level" \"productReference" \"TPNB").extract[String].toString
       val unitOfMeasure = (insideEachJson\ "level" \ "productReference" \"unitOfMeasure").extract[String].toString

       val state= (insideEachJson \ "level" \"states").extract[List[JValue]].
          map(x=>(x\"state").extract[String]).toString()
       val effectiveDateTime= (insideEachJson \ "level" \"states").extract[List[JValue]].
         map(x=>(x\"effectiveDateTime").extract[String]).toString
      val quantity= (insideEachJson \ "level" \"states").extract[List[JValue]].
         map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"quantity").extract[Double]).
         toString
      val stockKeepingLevel= (insideEachJson \ "level" \"states").extract[List[JValue]].
         map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"stockKeepingLevel").extract[String]).
       toString

      //Row(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel)

    println(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel)

      }).collect()

    //  sqlContext.createDataFrame(x,salesSchema).show(truncate = false)

}

推荐答案

您的问题有2个版本的解决方案.

There are 2 versions of solutions to your question.

版本1:

def main(Args : Array[String]): Unit = {

  val conf = new SparkConf().setAppName("JSON Read and Write using Spark RDD").setMaster("local[1]")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)

  val salesSchema = StructType(Array(
    StructField("prodID", StringType, true),
    StructField("unitOfMeasure", StringType, true),
    StructField("state", StringType, true),
    StructField("effectiveDateTime", StringType, true),
    StructField("quantity", StringType, true),
    StructField("stockKeepingLevel", StringType, true)
  ))

  val ReadAlljsonMessageInFile_RDD = sc.textFile("product_rdd.json")    

  val x = ReadAlljsonMessageInFile_RDD.map(eachJsonMessages => {

    parse(eachJsonMessages)

  }).map(insideEachJson=>{
    implicit  val formats = org.json4s.DefaultFormats

   val prodID = (insideEachJson\ "level" \"productReference" \"prodID").extract[String].toString
   val unitOfMeasure = (insideEachJson\ "level" \ "productReference" \"unitOfMeasure").extract[String].toString

   val state= (insideEachJson \ "level" \"states").extract[List[JValue]].
      map(x=>(x\"state").extract[String]).toString()
   val effectiveDateTime= (insideEachJson \ "level" \"states").extract[List[JValue]].
     map(x=>(x\"effectiveDateTime").extract[String]).toString
  val quantity= (insideEachJson \ "level" \"states").extract[List[JValue]].
     map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"quantity").extract[Double]).
     toString
  val stockKeepingLevel= (insideEachJson \ "level" \"states").extract[List[JValue]].
     map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"stockKeepingLevel").extract[String]).
   toString

  Row(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel)

  })

    sqlContext.createDataFrame(x,salesSchema).show(truncate = false)

}

这将为您提供以下输出:

This would give you following output:

+------+-------------+----------------+----------------------------------------------------------+-------------------+-----------------+
|prodID|unitOfMeasure|state           |effectiveDateTime                                         |quantity           |stockKeepingLevel|
+------+-------------+----------------+----------------------------------------------------------+-------------------+-----------------+
|1234  |EA           |List(SELL, HELD)|List(2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z)|List(1400.0, 800.0)|List(A, B)       |
+------+-------------+----------------+----------------------------------------------------------+-------------------+-----------------+

版本2 :

def main(Args : Array[String]): Unit = {

  val conf = new SparkConf().setAppName("JSON Read and Write using Spark RDD").setMaster("local[1]")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)

  val salesSchema = StructType(Array(
    StructField("prodID", StringType, true),
    StructField("unitOfMeasure", StringType, true),
    StructField("state", ArrayType(StringType, true), true),
    StructField("effectiveDateTime", ArrayType(StringType, true), true),
    StructField("quantity", ArrayType(DoubleType, true), true),
    StructField("stockKeepingLevel", ArrayType(StringType, true), true)
  ))

  val ReadAlljsonMessageInFile_RDD = sc.textFile("product_rdd.json")    

  val x = ReadAlljsonMessageInFile_RDD.map(eachJsonMessages => {

    parse(eachJsonMessages)

  }).map(insideEachJson=>{
    implicit  val formats = org.json4s.DefaultFormats

   val prodID = (insideEachJson\ "level" \"productReference" \"prodID").extract[String].toString
   val unitOfMeasure = (insideEachJson\ "level" \ "productReference" \"unitOfMeasure").extract[String].toString

   val state= (insideEachJson \ "level" \"states").extract[List[JValue]].
      map(x=>(x\"state").extract[String])
   val effectiveDateTime= (insideEachJson \ "level" \"states").extract[List[JValue]].
     map(x=>(x\"effectiveDateTime").extract[String])
  val quantity= (insideEachJson \ "level" \"states").extract[List[JValue]].
     map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"quantity").extract[Double])
  val stockKeepingLevel= (insideEachJson \ "level" \"states").extract[List[JValue]].
     map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"stockKeepingLevel").extract[String])

  Row(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel)

  })


    sqlContext.createDataFrame(x,salesSchema).show(truncate = false)

}

这将为您提供以下输出:

This would give you following output:

+------+-------------+------------+------------------------------------------------------+---------------+-----------------+
|prodID|unitOfMeasure|state       |effectiveDateTime                                     |quantity       |stockKeepingLevel|
+------+-------------+------------+------------------------------------------------------+---------------+-----------------+
|1234  |EA           |[SELL, HELD]|[2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z]|[1400.0, 800.0]|[A, B]           |
+------+-------------+------------+------------------------------------------------------+---------------+-----------------+

第1版和第2版之间的区别2是架构.在版本1中,您将每一列都转换为String,而在版本2中,它们都已转换为Array.

The difference between Version 1 & 2 is of schema. In Version 1 you are casting every column into String whereas in Version 2 they are being casted into Array.

这篇关于仅使用Spark-Scala RDD功能将JSON扁平化为表格结构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆