使用动态模式触发from_json [英] Spark from_json with dynamic schema

查看:99
本文介绍了使用动态模式触发from_json的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Spark处理具有可变结构(嵌套JSON)的JSON数据.输入的JSON数据可能非常大,每行超过1000个键,而一批可能超过20 GB. 已从30个数据源生成了整个批处理,并且每个JSON的"key2"都可用于标识源,并且每个源的结构都是预定义的.

I am trying to use Spark for processing JSON data with variable structure(nested JSON). Input JSON data could be very large with more than 1000 of keys per row and one batch could be more than 20 GB. Entire batch has been generated from 30 data sources and 'key2' of each JSON can be used to identify the source and structure for each source is predefined.

处理此类数据的最佳方法是什么? 我曾尝试使用from_json,如下所示,但它仅适用于固定模式,要首先使用它,我需要根据每个源对数据进行分组,然后应用该模式. 由于数据量很大,我的首选是只扫描一次数据,并根据预定义的架构从每个源中提取所需的值.

What would be the best approach for processing such data? I have tried using from_json like below but it works only with fixed schema and to use it first I need to group the data based on each source and then apply the schema. Due to large data volume my preferred choice is to scan the data only once and extract required values from each source, based on predefined schema.

import org.apache.spark.sql.types._ 
import spark.implicits._

val data = sc.parallelize(
    """{"key1":"val1","key2":"source1","key3":{"key3_k1":"key3_v1"}}"""
    :: Nil)
val df = data.toDF


val schema = (new StructType)
    .add("key1", StringType)
    .add("key2", StringType)
    .add("key3", (new StructType)
    .add("key3_k1", StringType))


df.select(from_json($"value",schema).as("json_str"))
  .select($"json_str.key3.key3_k1").collect
res17: Array[org.apache.spark.sql.Row] = Array([xxx])

推荐答案

这只是@Ramesh Maharjan回答的重述,但具有更现代的Spark语法.

This is just a restatement of @Ramesh Maharjan's answer, but with more modern Spark syntax.

我发现此方法潜伏在DataFrameReader中,该方法使您可以将JSON字符串从Dataset[String]解析为任意DataFrame,并利用相同的模式推断,当直接从a读取时,Spark为您提供spark.read.json("filepath") JSON文件.每行的模式可以完全不同.

I found this method lurking in DataFrameReader which allows you to parse JSON strings from a Dataset[String] into an arbitrary DataFrame and take advantage of the same schema inference Spark gives you with spark.read.json("filepath") when reading directly from a JSON file. The schema of each row can be completely different.

def json(jsonDataset: Dataset[String]): DataFrame

示例用法:

val jsonStringDs = spark.createDataset[String](
  Seq(
      ("""{"firstname": "Sherlock", "lastname": "Holmes", "address": {"streetNumber": 121, "street": "Baker", "city": "London"}}"""),
      ("""{"name": "Amazon", "employeeCount": 500000, "marketCap": 817117000000, "revenue": 177900000000, "CEO": "Jeff Bezos"}""")))

jsonStringDs.show

jsonStringDs:org.apache.spark.sql.Dataset[String] = [value: string]
+----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                 
|
+----------------------------------------------------------------------------------------------------------------------+
|{"firstname": "Sherlock", "lastname": "Holmes", "address": {"streetNumber": 121, "street": "Baker", "city": "London"}}|
|{"name": "Amazon", "employeeCount": 500000, "marketCap": 817117000000, "revenue": 177900000000, "CEO": "Jeff Bezos"}  |
+----------------------------------------------------------------------------------------------------------------------+


val df = spark.read.json(jsonStringDs)
df.show(false)

df:org.apache.spark.sql.DataFrame = [CEO: string, address: struct ... 6 more fields]
+----------+------------------+-------------+---------+--------+------------+------+------------+
|CEO       |address           |employeeCount|firstname|lastname|marketCap   |name  |revenue     |
+----------+------------------+-------------+---------+--------+------------+------+------------+
|null      |[London,Baker,121]|null         |Sherlock |Holmes  |null        |null  |null        |
|Jeff Bezos|null              |500000       |null     |null    |817117000000|Amazon|177900000000|
+----------+------------------+-------------+---------+--------+------------+------+------------+

该方法可从Spark 2.2.0获得:

The method is available from Spark 2.2.0: http://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader@json(jsonDataset:org.apache.spark.sql.Dataset[String]):org.apache.spark.sql.DataFrame

这篇关于使用动态模式触发from_json的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆