Spark JSON 文本字段到 RDD [英] Spark JSON text field to RDD

查看:33
本文介绍了Spark JSON 文本字段到 RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 cassandra 表,其中有一个名为 snapshot 的文本字段,其中包含 JSON 对象:

I've got a cassandra table with a field of type text named snapshot containing JSON objects:

[identifier, timestamp, snapshot]

我知道为了能够使用 Spark 对该字段进行转换,我需要将该 RDD 的该字段转换为另一个 RDD 以对 JSON 模式进行转换.

I understood that to be able to do transformations on that field with Spark, I need to convert that field of that RDD to another RDD to make transformations on the JSON schema.

这样对吗?我应该如何处理?

Is that correct? How should I proceed to to that?

现在我设法从单个文本字段创建一个 RDD:

For now I managed to create an RDD from a single text field:

val conf = new SparkConf().setAppName("signal-aggregation")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots")
val first = snapshots.first()
val firstJson = sqlContext.jsonRDD(sc.parallelize(Seq(first._3)))
firstJson.printSchema()

它向我展示了 JSON 模式.好!

Which shows me the JSON schema. Good!

我如何继续告诉 Spark 该模式应该应用于表 Snapshots 的所有行,以从每一行获取该快照字段上的 RDD?

How do I proceed to tell Spark that this schema should be applied on all rows of the table Snapshots, to get an RDD on that snapshot field from each row?

推荐答案

差不多到此为止,您只想将带有 json 的 RDD[String] 传递到jsonRDD 方法

Almost there, you just want to pass your an RDD[String] with your json into the jsonRDD method

val conf = new SparkConf().setAppName("signal-aggregation")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots")
val jsons = snapshots.map(_._3) // Get Third Row Element Json(RDD[String]) 
val jsonSchemaRDD = sqlContext.jsonRDD(jsons) // Pass in RDD directly
jsonSchemaRDD.registerTempTable("testjson")
sqlContext.sql("SELECT * FROM testjson where .... ").collect 

<小时>

一个简单的例子


A quick example

val stringRDD = sc.parallelize(Seq(""" 
  { "isActive": false,
    "balance": "$1,431.73",
    "picture": "http://placehold.it/32x32",
    "age": 35,
    "eyeColor": "blue"
  }""",
   """{
    "isActive": true,
    "balance": "$2,515.60",
    "picture": "http://placehold.it/32x32",
    "age": 34,
    "eyeColor": "blue"
  }""", 
  """{
    "isActive": false,
    "balance": "$3,765.29",
    "picture": "http://placehold.it/32x32",
    "age": 26,
    "eyeColor": "blue"
  }""")
)
sqlContext.jsonRDD(stringRDD).registerTempTable("testjson")
csc.sql("SELECT age from testjson").collect
//res24: Array[org.apache.spark.sql.Row] = Array([35], [34], [26])

这篇关于Spark JSON 文本字段到 RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆