使用Apache Spark将表序列化为嵌套JSON [英] Serialize table to nested JSON using Apache Spark
本文介绍了使用Apache Spark将表序列化为嵌套JSON的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一组类似以下示例的记录
I have a set of records like the following sample
|ACCOUNTNO|VEHICLENUMBER|CUSTOMERID|
+---------+-------------+----------+
| 10003014| MH43AJ411| 20000000|
| 10003014| MH43AJ411| 20000001|
| 10003015| MH12GZ3392| 20000002|
我想解析为JSON,它应该看起来像这样:
I want to parse into JSON and it should be look like this:
{
"ACCOUNTNO":10003014,
"VEHICLE": [
{ "VEHICLENUMBER":"MH43AJ411", "CUSTOMERID":20000000},
{ "VEHICLENUMBER":"MH43AJ411", "CUSTOMERID":20000001}
],
"ACCOUNTNO":10003015,
"VEHICLE": [
{ "VEHICLENUMBER":"MH12GZ3392", "CUSTOMERID":20000002}
]
}
我已经编写了程序,但未能实现输出.
I have written the program but failed to achieve the output.
package com.report.pack1.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
object sqltojson {
def main(args:Array[String]) {
System.setProperty("hadoop.home.dir", "C:/winutil/")
val conf = new SparkConf().setAppName("SQLtoJSON").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val jdbcSqlConnStr = "jdbc:sqlserver://192.168.70.88;databaseName=ISSUER;user=bhaskar;password=welcome123;"
val jdbcDbTable = "[HISTORY].[TP_CUSTOMER_PREPAIDACCOUNTS]"
val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> jdbcSqlConnStr,"dbtable" -> jdbcDbTable)).load()
jdbcDF.registerTempTable("tp_customer_account")
val res01 = sqlContext.sql("SELECT ACCOUNTNO, VEHICLENUMBER, CUSTOMERID FROM tp_customer_account GROUP BY ACCOUNTNO, VEHICLENUMBER, CUSTOMERID ORDER BY ACCOUNTNO ")
res01.coalesce(1).write.json("D:/res01.json")
}
}
如何以给定的格式序列化?预先感谢!
How can I serialize in the given format? Thanks in advance!
推荐答案
您可以使用struct
和groupBy
获得所需的结果.下面是相同的代码.我已在需要时注释了代码.
You can use struct
and groupBy
to get your desired result. Below is the code for same. I have commented the code whenever required.
val df = Seq((10003014,"MH43AJ411",20000000),
(10003014,"MH43AJ411",20000001),
(10003015,"MH12GZ3392",20000002)
).toDF("ACCOUNTNO","VEHICLENUMBER","CUSTOMERID")
df.show
//output
//+---------+-------------+----------+
//|ACCOUNTNO|VEHICLENUMBER|CUSTOMERID|
//+---------+-------------+----------+
//| 10003014| MH43AJ411| 20000000|
//| 10003014| MH43AJ411| 20000001|
//| 10003015| MH12GZ3392| 20000002|
//+---------+-------------+----------+
//create a struct column then group by ACCOUNTNO column and finally convert DF to JSON
df.withColumn("VEHICLE",struct("VEHICLENUMBER","CUSTOMERID")).
select("VEHICLE","ACCOUNTNO"). //only select reqired columns
groupBy("ACCOUNTNO").
agg(collect_list("VEHICLE").as("VEHICLE")). //for the same group create a list of vehicles
toJSON. //convert to json
show(false)
//output
//+------------------------------------------------------------------------------------------------------------------------------------------+
//|value |
//+------------------------------------------------------------------------------------------------------------------------------------------+
//|{"ACCOUNTNO":10003014,"VEHICLE":[{"VEHICLENUMBER":"MH43AJ411","CUSTOMERID":20000000},{"VEHICLENUMBER":"MH43AJ411","CUSTOMERID":20000001}]}|
//|{"ACCOUNTNO":10003015,"VEHICLE":[{"VEHICLENUMBER":"MH12GZ3392","CUSTOMERID":20000002}]} |
//+------------------------------------------------------------------------------------------------------------------------------------------+
您也可以使用与您提到的语句相同的语句将此dataframe
写入文件.
You can also write this dataframe
to a file using same statement as you mentioned in question.
这篇关于使用Apache Spark将表序列化为嵌套JSON的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文