如何在Spark DataFrames/Spark SQL中使用架构读取JSON [英] how to read json with schema in spark dataframes/spark sql
问题描述
sql/dataframes, 请帮帮我或就如何阅读此json提供一些好的建议
sql/dataframes, please help me out or provide some good suggestion on how to read this json
{
"billdate":"2016-08-08',
"accountid":"xxx"
"accountdetails":{
"total":"1.1"
"category":[
{
"desc":"one",
"currentinfo":{
"value":"10"
},
"subcategory":[
{
"categoryDesc":"sub",
"value":"10",
"currentinfo":{
"value":"10"
}
}]
}]
}
}
谢谢
推荐答案
似乎您的json无效. 请检查 http://www.jsoneditoronline.org/
Seems like your json is not valid. pls check with http://www.jsoneditoronline.org/
请参阅 an-introduction-to-json-support-in-spark-sql.html
如果要注册为表,可以按如下所示进行注册并打印架构.
if you want to register as the table you can register like below and print the schema.
DataFrame df = sqlContext.read().json("/path/to/validjsonfile").toDF();
df.registerTempTable("df");
df.printSchema();
下面是示例代码段
DataFrame app = df.select("toplevel");
app.registerTempTable("toplevel");
app.printSchema();
app.show();
DataFrame appName = app.select("toplevel.sublevel");
appName.registerTempTable("sublevel");
appName.printSchema();
appName.show();
scala示例:
{"name":"Michael", "cities":["palo alto", "menlo park"], "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}
{"name":"Andy", "cities":["santa cruz"], "schools":[{"sname":"ucsb", "year":2011}]}
{"name":"Justin", "cities":["portland"], "schools":[{"sname":"berkeley", "year":2014}]}
val people = sqlContext.read.json("people.json")
people: org.apache.spark.sql.DataFrame
阅读顶级字段
val names = people.select('name).collect()
names: Array[org.apache.spark.sql.Row] = Array([Michael], [Andy], [Justin])
names.map(row => row.getString(0))
res88: Array[String] = Array(Michael, Andy, Justin)
使用select()方法指定顶级字段,使用collect()将其收集到Array [Row]中,并使用getString()方法访问每个Row中的列.
Use the select() method to specify the top-level field, collect() to collect it into an Array[Row], and the getString() method to access a column inside each Row.
每个人都有一组城市".让我们展平这些数组并读出所有元素.
each Person has an array of "cities". Let's flatten these arrays and read out all their elements.
val flattened = people.explode("cities", "city"){c: List[String] => c}
flattened: org.apache.spark.sql.DataFrame
val allCities = flattened.select('city).collect()
allCities: Array[org.apache.spark.sql.Row]
allCities.map(row => row.getString(0))
res92: Array[String] = Array(palo alto, menlo park, santa cruz, portland)
explode()方法将城市阵列爆炸或展平到名为"city"的新列中.然后,我们使用select()选择新列,使用collect()将其收集到Array [Row]中,并使用getString()访问每个Row中的数据.
The explode() method explodes, or flattens, the cities array into a new column named "city". We then use select() to select the new column, collect() to collect it into an Array[Row], and getString() to access the data inside each Row.
读取学校"数据,该数据是嵌套的JSON对象的数组.数组的每个元素都包含学校名称和年份:
read out the "schools" data, which is an array of nested JSON objects. Each element of the array holds the school name and year:
val schools = people.select('schools).collect()
schools: Array[org.apache.spark.sql.Row]
val schoolsArr = schools.map(row => row.getSeq[org.apache.spark.sql.Row](0))
schoolsArr: Array[Seq[org.apache.spark.sql.Row]]
schoolsArr.foreach(schools => {
schools.map(row => print(row.getString(0), row.getLong(1)))
print("\n")
})
(stanford,2010)(berkeley,2012)
(ucsb,2011)
(berkeley,2014)
使用select()
和collect()
选择学校"数组,并将其收集到Array[Row]
中.现在,每个学校"数组的类型都是List[Row]
,因此我们使用getSeq[Row]()
方法将其读出.最后,我们可以通过调用getString()
代表学校名称和getLong()
代表学年来读取每所学校的信息.
Use select()
and collect()
to select the "schools" array and collect it into an Array[Row]
. Now, each "schools" array is of type List[Row]
, so we read it out with the getSeq[Row]()
method. Finally, we can read the information for each individual school, by calling getString()
for the school name and getLong()
for the school year.
这篇关于如何在Spark DataFrames/Spark SQL中使用架构读取JSON的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!