如何利用星火DataFrames查询JSON数据列? [英] How to query JSON data column using Spark DataFrames?

查看:135
本文介绍了如何利用星火DataFrames查询JSON数据列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个卡桑德拉表,为了简便起见看起来类似:

I have a Cassandra table that for simplicity looks something like:

key: text
jsonData: text
blobData: blob

我可以创建这个火花通过一个基本的数据帧,并使用火花卡桑德拉连接器:

I can create a basic data frame for this using spark and the spark-cassandra-connector using:

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()

我挣扎,虽然扩大了JSON数据到其底层结构。我最终希望能够基于JSON字符串中的属性过滤和返回BLOB数据。像jsonData.foo =酒吧,并返回blobData。这是目前可能?

I'm struggling though to expand the JSON data into its underlying structure. I ultimately want to be able to filter based on the attributes within the json string and return the blob data. Something like jsonData.foo = "bar" and return blobData. Is this currently possible?

推荐答案

星火1.6 +

您可以使用 get_json_object 这需要在列和路径:

You can use get_json_object which takes a column and a path:

import org.apache.spark.sql.functions.get_json_object

val exprs = Seq("k", "v").map(
  c => get_json_object($"jsonData", s"$$.$c").alias(c))

df.select($"*" +: exprs: _*)

和提取领域个体线可进一步浇铸到预期的类型。

and extracts fields to individual strings which can be further casted to expected types.

星火< = 1.5

这是目前可能?

据我知道这是不是直接可能。您可以尝试类似的措施:

As far as I know it is not directly possible. You can try something similar to this:

val df = sc.parallelize(Seq(
  ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
  ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")

我认为一滴字段不能重新在JSON psented $ P $。否则,你驾驶室省略分裂和加入:

I assume that blob field cannot be represented in JSON. Otherwise you cab omit splitting and joining:

import org.apache.spark.sql.Row

val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map{
  case Row(key: String, json: String) =>
    s"""{"key": "$key", "jsonData": $json}"""
}) 

val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema

// root
//  |-- jsonData: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: double (nullable = true)
//  |-- key: long (nullable = true)
//  |-- blobData: string (nullable = true)

这是另一种(更便宜,但更复杂)的方法是使用UDF来解析JSON和输出结构地图列。例如这样的事情:

An alternative (cheaper, although more complex) approach is to use an UDF to parse JSON and output a struct or map column. For example something like this:

import net.liftweb.json.parse

case class KV(k: String, v: Int)

val parseJson = udf((s: String) => {
  implicit val formats = net.liftweb.json.DefaultFormats
  parse(s).extract[KV]
})

val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show

// +---+--------------------+------------------+----------+
// |key|            jsonData|          blobData|parsedJSON|
// +---+--------------------+------------------+----------+
// |  1|{"k": "foo", "v":...|some_other_field_1|   [foo,1]|
// |  2|{"k": "bar", "v":...|some_other_field_2|   [bar,3]|
// +---+--------------------+------------------+----------+

parsed.printSchema

// root
//  |-- key: string (nullable = true)
//  |-- jsonData: string (nullable = true)
//  |-- blobData: string (nullable = true)
//  |-- parsedJSON: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: integer (nullable = false)

这篇关于如何利用星火DataFrames查询JSON数据列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆