Spark Scala-从dataframe列解析json并返回带有列的RDD [英] Spark scala - parse json from dataframe column and return RDD with columns

查看:470
本文介绍了Spark Scala-从dataframe列解析json并返回带有列的RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个sparkScala RDD,看起来像这样:

I have a sparkScala RDD that looks like this :

df.printSchema()

 |-- stock._id: string (nullable = true)
 |-- stock.value: string (nullable = true)

RDD的第二列是嵌套的JSON:

[ { ""warehouse"" : ""Type1"" , ""amount"" : ""0.0"" }, { ""warehouse"" : ""Type1"" , ""amount"" : ""25.0"" }]

我需要生成一个RDD,其中将包含现有的两列,也包含来自JSON的列,例如:

I need to generate an RDD that will contain the existing two columns but also the columns from the JSON like:

_id, value , warehouse , amount

我尝试使用自定义函数来实现它,但是我正在努力将此函数应用于我的RDD并获得所需的结果

I've tried to do it using custom functions, but I'm struggling to apply this function to my RDD and getting the needed result

import org.json4s.jackson.JsonMethods._

import org.json4s._

 def extractWarehouses (value: String)  {
    val json = parse(value)
    for {
      JObject(warehouses) <- json
      JField("warehouse", JString(warehouse)) <- warehouses
      JField("amount", JDouble(amount)) <- warehouses
    } yield (warehouse, amount)
  }

推荐答案

正如您所说的,value是一个json数组,其中保存json对象的列表,您需要对其进行分解并获取单独的属性,如下所示:

As you said value is a json array which is holding list of json objects, you need to explode it and get individual properties as columns something like below:

import org.apache.spark.sql.functions

val flattenedDF = df.select(functions.column("_id"), functions.explode(df("value")).as("value"))
val result = flattenedDF.select("_id", "value.warehouse", "value.amount")
result.printSchema()

这篇关于Spark Scala-从dataframe列解析json并返回带有列的RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆