仅使用Spark-Scala RDD功能将JSON扁平化为表格结构 [英] Flattening JSON into Tabular Structure using Spark-Scala RDD only fucntion
问题描述
我有嵌套的JSON,并且喜欢以表格结构输出.我能够单独解析JSON值,但在将其制成表格时存在一些问题.我能够通过数据框轻松地做到这一点.但是我想使用仅RDD"功能.任何帮助表示赞赏.
I have nested JSON and like to have output in tabular structure. I am able to parse the JSON values individually , but having some problems in tabularizing it. I am able to do it via dataframe easily. But I want do it using "RDD ONLY " functions. Any help much appreciated.
输入JSON:
{ "level":{"productReference":{
"prodID":"1234",
"unitOfMeasure":"EA"
},
"states":[
{
"state":"SELL",
"effectiveDateTime":"2015-10-09T00:55:23.6345Z",
"stockQuantity":{
"quantity":1400.0,
"stockKeepingLevel":"A"
}
},
{
"state":"HELD",
"effectiveDateTime":"2015-10-09T00:55:23.6345Z",
"stockQuantity":{
"quantity":800.0,
"stockKeepingLevel":"B"
}
}
] }}
预期输出:
我尝试了以下Spark代码.但是,获得像这样的输出和Row()对象无法解析它.
I tried Below Spark code . But getting output like this and Row() object is not able to parse this.
079562193,EA,列表(SELLABLE,HELD),列表(2015-10-09T00:55:23.6345Z,2015-10-09T00:55:23.6345Z),列表(1400.0,800.0),列表(SINGLE,单)
079562193,EA,List(SELLABLE, HELD),List(2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z),List(1400.0, 800.0),List(SINGLE, SINGLE)
def main(Args : Array[String]): Unit = {
val conf = new SparkConf().setAppName("JSON Read and Write using Spark RDD").setMaster("local[1]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val salesSchema = StructType(Array(
StructField("prodID", StringType, true),
StructField("unitOfMeasure", StringType, true),
StructField("state", StringType, true),
StructField("effectiveDateTime", StringType, true),
StructField("quantity", StringType, true),
StructField("stockKeepingLevel", StringType, true)
))
val ReadAlljsonMessageInFile_RDD = sc.textFile("product_rdd.json")
val x = ReadAlljsonMessageInFile_RDD.map(eachJsonMessages => {
parse(eachJsonMessages)
}).map(insideEachJson=>{
implicit val formats = org.json4s.DefaultFormats
val prodID = (insideEachJson\ "level" \"productReference" \"TPNB").extract[String].toString
val unitOfMeasure = (insideEachJson\ "level" \ "productReference" \"unitOfMeasure").extract[String].toString
val state= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"state").extract[String]).toString()
val effectiveDateTime= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"effectiveDateTime").extract[String]).toString
val quantity= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"quantity").extract[Double]).
toString
val stockKeepingLevel= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"stockKeepingLevel").extract[String]).
toString
//Row(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel)
println(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel)
}).collect()
// sqlContext.createDataFrame(x,salesSchema).show(truncate = false)
}
推荐答案
您的问题有2个版本的解决方案.
There are 2 versions of solutions to your question.
版本1:
def main(Args : Array[String]): Unit = {
val conf = new SparkConf().setAppName("JSON Read and Write using Spark RDD").setMaster("local[1]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val salesSchema = StructType(Array(
StructField("prodID", StringType, true),
StructField("unitOfMeasure", StringType, true),
StructField("state", StringType, true),
StructField("effectiveDateTime", StringType, true),
StructField("quantity", StringType, true),
StructField("stockKeepingLevel", StringType, true)
))
val ReadAlljsonMessageInFile_RDD = sc.textFile("product_rdd.json")
val x = ReadAlljsonMessageInFile_RDD.map(eachJsonMessages => {
parse(eachJsonMessages)
}).map(insideEachJson=>{
implicit val formats = org.json4s.DefaultFormats
val prodID = (insideEachJson\ "level" \"productReference" \"prodID").extract[String].toString
val unitOfMeasure = (insideEachJson\ "level" \ "productReference" \"unitOfMeasure").extract[String].toString
val state= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"state").extract[String]).toString()
val effectiveDateTime= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"effectiveDateTime").extract[String]).toString
val quantity= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"quantity").extract[Double]).
toString
val stockKeepingLevel= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"stockKeepingLevel").extract[String]).
toString
Row(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel)
})
sqlContext.createDataFrame(x,salesSchema).show(truncate = false)
}
这将为您提供以下输出:
This would give you following output:
+------+-------------+----------------+----------------------------------------------------------+-------------------+-----------------+
|prodID|unitOfMeasure|state |effectiveDateTime |quantity |stockKeepingLevel|
+------+-------------+----------------+----------------------------------------------------------+-------------------+-----------------+
|1234 |EA |List(SELL, HELD)|List(2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z)|List(1400.0, 800.0)|List(A, B) |
+------+-------------+----------------+----------------------------------------------------------+-------------------+-----------------+
版本2 :
def main(Args : Array[String]): Unit = {
val conf = new SparkConf().setAppName("JSON Read and Write using Spark RDD").setMaster("local[1]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val salesSchema = StructType(Array(
StructField("prodID", StringType, true),
StructField("unitOfMeasure", StringType, true),
StructField("state", ArrayType(StringType, true), true),
StructField("effectiveDateTime", ArrayType(StringType, true), true),
StructField("quantity", ArrayType(DoubleType, true), true),
StructField("stockKeepingLevel", ArrayType(StringType, true), true)
))
val ReadAlljsonMessageInFile_RDD = sc.textFile("product_rdd.json")
val x = ReadAlljsonMessageInFile_RDD.map(eachJsonMessages => {
parse(eachJsonMessages)
}).map(insideEachJson=>{
implicit val formats = org.json4s.DefaultFormats
val prodID = (insideEachJson\ "level" \"productReference" \"prodID").extract[String].toString
val unitOfMeasure = (insideEachJson\ "level" \ "productReference" \"unitOfMeasure").extract[String].toString
val state= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"state").extract[String])
val effectiveDateTime= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"effectiveDateTime").extract[String])
val quantity= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"quantity").extract[Double])
val stockKeepingLevel= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"stockKeepingLevel").extract[String])
Row(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel)
})
sqlContext.createDataFrame(x,salesSchema).show(truncate = false)
}
这将为您提供以下输出:
This would give you following output:
+------+-------------+------------+------------------------------------------------------+---------------+-----------------+
|prodID|unitOfMeasure|state |effectiveDateTime |quantity |stockKeepingLevel|
+------+-------------+------------+------------------------------------------------------+---------------+-----------------+
|1234 |EA |[SELL, HELD]|[2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z]|[1400.0, 800.0]|[A, B] |
+------+-------------+------------+------------------------------------------------------+---------------+-----------------+
第1版和第2版之间的区别2是架构.在版本1中,您将每一列都转换为String
,而在版本2中,它们都已转换为Array
.
The difference between Version 1 & 2 is of schema. In Version 1 you are casting every column into String
whereas in Version 2 they are being casted into Array
.
这篇关于仅使用Spark-Scala RDD功能将JSON扁平化为表格结构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!