使用Spark从DynamoDB JSON字符串中提取嵌套的Json字段? [英] Extract Nested Json fields from DynamoDB JSON string using Spark?

查看:288
本文介绍了使用Spark从DynamoDB JSON字符串中提取嵌套的Json字段?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在从Spark中读取一个dynamodb表,该表在一个字段中有一个JSON字符串,在其他字段中有字符串。我能够读取JSON字段,但不能读取嵌套的JSON字段。这不是使用数据框查询Json列的重复项。这个问题确实说明了如何从JSON字符串中提取列,而不是嵌套JSON列。

I am reading a dynamodb table from Spark, this table has one JSON string in one field and strings in other fields. I am able to read the JSON fields but not the nested JSON fields. This is not a DUPLICATE of query Json Column using dataframes. The question does explain how to extract columns from JSON string but not the Nested JSON columns.

import com.github.traviscrawford.spark.dynamodb._
val users = sqlContext.read.dynamodb("Dynamodb_table")

users.show(1)

users.show(1)

样本数据集

 |col1                                                        | ID | field2|field3|
 -------------------------------------------------------------------------------------
 |{"a":[{"b":"value1","x":23},{"b":value2,"x":52}],"c":"valC"}|A1  | X1    |Y1    |

我需要从col1(JSON结构)和ID字段中提取一些字段。我能够弄清楚如何解析JSON字段(col1)并从col1获取字段'c',如但无法提取嵌套字段。

I need to extract few fields from col1(JSON structure) and ID field. I am able to figure out how to parse the JSON field(col1) and get field 'c' from col1 as explained here but not able to extract the nested fields.

我的代码:

val users = sqlContext.read.dynamodb("Dynamodb_table")
val data = users.selectExpr("get_json_object(col1, '$.c')","get_json_object(col1, '$.a')","ID")

data.show(1,false)
|a                                              |c   |ID|
---------------------------------------------------------
|[{"b":"value1","x":23},{"b":value2","x":52}...]|valC|A1|

现在,当我尝试在上述数据帧上应用相同的get_json_object时,我获取所有空值。

Now when i try to apply the same get_json_object on above data frame, i get all null values.

val nestedData = data.selectExpr("get_json_object(a, '$.b')","c","ID")
nestedData.show(false)

|get_json_object(a, '$.b')| c  | ID|
------------------------------------
|null                     |valC|A1 |    

我也尝试过爆炸,因为col'a'具有数组和结构,但这作为数据框'data'都不起作用正在将col / field'a'作为字符串而不是数组返回。任何想法如何解决此问题?

I tried explode as well since col 'a' has array and struct. But that didn't work either as the data frame 'data' is returning col/field 'a' as a string instead of an array.Any ideas how to solve this?

更新:我也尝试使用JSON4s和net.liftweb进行解析.json.parse,也没有帮助

Update: I also tried parsing using JSON4s and net.liftweb.json.parse . That didn't help either

case class aInfo(b: String) 
case class col1(a: Option[aInfo]), c: String)

import net.liftweb.json.parse
val parseJson = udf((data: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(data).extract[Data]
})

val parsed = users.withColumn("parsedJSON", parseJson($"data"))
parsed.show(1)

使用这些解析器时,所有值都为空。

All values came out as null when i used these parsers.

我的预期结果:我正在尝试从数据集中获取扁平化的结构

My expected result: I am trying to get a flattened out structure from the dataset

|b     |x |c   | ID|
--------------------
|value1|23|valC|A1 |
|value2|52|valC|A1 |


推荐答案

我相信所有必要的难题已经在这里,让我们逐步进行此操作。您的数据等效于:

I believe that all required pieces of the puzzle are already here so let's follow this step by step. Your data is equivalent to:

val df = Seq((
  """{"a":[{"b":"value1"},{"b": "value2"}],"c":"valC"}""", "A1", "X1", "Y1"
)).toDF("col1", "ID",  "field2", "field3")

Spark提供了实现与Lift相同的查询API的json4s:

Spark provides json4s which implements the same query API as Lift:

import org.json4s._
import org.json4s.jackson.JsonMethods._

我们可以使用例如用于定义UDF的LINQ样式API:

and we can use for example LINQ style API to define an UDF:

val getBs = udf((s: String) => for { 
  JString(b) <- parse(s) \ "a" \ "b" 
} yield b)

如果要提取多个字段,当然可以扩展它。例如,如果JSON字符串具有多个字段

If you want to extract multiple fields you can of course extend this. For example if JSON string has multiple fields

{"a":[{"b":"value1","d":1},{"b":"value2","d":2}],"c":"valC"}

您可以:

for  {
  JObject(a) <- parse(s) \ "a"
  JField("b", JString(b))  <- a
  JField("d", JInt(d))  <- a
} yield (b, d)

不会有比赛。要处理丢失的字段,您可能更喜欢类似XPath的表达式或提取器:

This assumes that both fields are present otherwise there won't be a match. To handle missing fields you may prefer XPath-like expressions or extractors:

case class A(b: Option[String], d: Option[Int])

(parse(s) \ "a").extract(Seq[A])

这样的UDF可以与爆炸一起使用以提取字段:

UDF like this can be uses with explode to extract fields:

val withBs = df.withColumn("b", explode(getBs($"col1")))

结果:

+--------------------+---+------+------+------+
|                col1| ID|field2|field3|     b|
+--------------------+---+------+------+------+
|{"a":[{"b":"value...| A1|    X1|    Y1|value1|
|{"a":[{"b":"value...| A1|    X1|    Y1|value2|
+--------------------+---+------+------+------+

您尝试使用Lift是不正确的,因为您期望 a aInfo 的序列,但仅将其定义为 Option [aInfo] 。应该是 Option [Seq [aInfo]]

Your attempt to use Lift is incorrect because you expect a to be sequence of aInfo but define it only as Option[aInfo]. It should be Option[Seq[aInfo]]:

case class col1(a: Option[Seq[aInfo]], c: String)

具有这样定义的类,解析应该可以正常工作。

With class defined like this parsing should work without an issue.

如果您使用当前版本(Spark 2.1.0),则会有一个 SPARK-17699 引入的from_json 方法模式:

If you use a current build (Spark 2.1.0) there is a from_json method introduced by SPARK-17699 which requires a schema:

import org.apache.spark.sql.types._

val bSchema = StructType(Seq(StructField("b", StringType, true)))
val aSchema = StructField("a", ArrayType(bSchema), true)
val cSchema = StructField("c", StringType, true)

val schema =  StructType(Seq(aSchema, cSchema))

,并且可以应用为:

import org.apache.spark.sql.functions.from_json

val parsed = df.withColumn("col1", from_json($"col1", schema))

之后,您可以使用常用符号选择字段:

After that you can select fields using usual notation:

parsed.select($"col1.a.b")

这篇关于使用Spark从DynamoDB JSON字符串中提取嵌套的Json字段?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆