将JSON文本字段火花到RDD [英] Spark JSON text field to RDD
问题描述
我有一个cassandra表,其中的文本类型字段名为"snapshot",其中包含JSON对象:
I've got a cassandra table with a field of type text named snapshot containing JSON objects:
[identifier, timestamp, snapshot]
我了解到,要能够使用Spark在该字段上进行转换,我需要将该RDD的该字段转换为另一个RDD,以便在JSON模式上进行转换.
I understood that to be able to do transformations on that field with Spark, I need to convert that field of that RDD to another RDD to make transformations on the JSON schema.
那是正确的吗?我应该如何进行呢?
Is that correct? How should I proceed to to that?
目前,我设法通过单个文本字段创建了一个RDD:
For now I managed to create an RDD from a single text field:
val conf = new SparkConf().setAppName("signal-aggregation")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots")
val first = snapshots.first()
val firstJson = sqlContext.jsonRDD(sc.parallelize(Seq(first._3)))
firstJson.printSchema()
其中显示了JSON模式.好!
Which shows me the JSON schema. Good!
如何继续告诉Spark应该将此模式应用于表快照的所有行,以便从每一行获取该快照字段的RDD?
How do I proceed to tell Spark that this schema should be applied on all rows of the table Snapshots, to get an RDD on that snapshot field from each row?
推荐答案
几乎在这里,您只想将带有json的RDD [String]传递到
jsonRDD
方法
Almost there, you just want to pass your an RDD[String] with your json into the
jsonRDD
method
val conf = new SparkConf().setAppName("signal-aggregation")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots")
val jsons = snapshots.map(_._3) // Get Third Row Element Json(RDD[String])
val jsonSchemaRDD = sqlContext.jsonRDD(jsons) // Pass in RDD directly
jsonSchemaRDD.registerTempTable("testjson")
sqlContext.sql("SELECT * FROM testjson where .... ").collect
一个简单的例子
A quick example
val stringRDD = sc.parallelize(Seq("""
{ "isActive": false,
"balance": "$1,431.73",
"picture": "http://placehold.it/32x32",
"age": 35,
"eyeColor": "blue"
}""",
"""{
"isActive": true,
"balance": "$2,515.60",
"picture": "http://placehold.it/32x32",
"age": 34,
"eyeColor": "blue"
}""",
"""{
"isActive": false,
"balance": "$3,765.29",
"picture": "http://placehold.it/32x32",
"age": 26,
"eyeColor": "blue"
}""")
)
sqlContext.jsonRDD(stringRDD).registerTempTable("testjson")
csc.sql("SELECT age from testjson").collect
//res24: Array[org.apache.spark.sql.Row] = Array([35], [34], [26])
这篇关于将JSON文本字段火花到RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!