如何使用 from_json 与模式作为字符串(即 JSON 编码模式)? [英] How to use from_json with schema as string (i.e. a JSON-encoded schema)?

查看:52
本文介绍了如何使用 from_json 与模式作为字符串(即 JSON 编码模式)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在读取来自 Kafka 的流,并将值从 Kafka(即 JSON)转换为 Structure.

from_json 有一个采用 String 类型模式的变体,但我找不到示例.请告知以下代码中的错误.

错误

线程main"org.apache.spark.sql.catalyst.parser.ParseException 中的异常:外来输入 '(' 期待 {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT',== SQL ==STRUCT ( `firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY ( STRUCT ( `city`: STRING, `state`: STRING, `zip`: STRING ) )-------^^^在 org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)

计划

public static void main(String[] args) 抛出 AnalysisException {String master = "local[*]";String brokers = "quickstart:9092";字符串主题 = "simple_topic_6";SparkSession sparkSession = SparkSession.builder().appName(EmployeeSchemaLoader.class.getName()).master(master).getOrCreate();String employeeSchema = "STRUCT ( firstName: STRING, lastName: STRING, email: STRING, " +地址:ARRAY(STRUCT(城市:STRING,州:STRING,邮编:STRING)))";SparkContext 上下文 = sparkSession.sparkContext();context.setLogLevel("错误");SQLContext sqlCtx = sparkSession.sqlContext();数据集<行>员工数据集 = sparkSession.readStream().格式(卡夫卡").选项(kafka.bootstrap.servers",经纪人).option("订阅", 主题).load();员工数据集.printSchema();employeeDataset = employeeDataset.withColumn("strValue", employeeDataset.col("value").cast("string"));员工数据集 = 员工数据集.withColumn("employeeRecord",函数.from_json(employeeDataset.col("strValue"),employeeSchema, new HashMap<>()));员工数据集.printSchema();employeeDataset.createOrReplaceTempView("employeeView");sparkSession.catalog().listTables().show();sqlCtx.sql("select * from employeeView").show();}

解决方案

您的问题帮助我发现基于 String 架构的 from_json 变体仅可用在 Java 中,并且最近已添加到即将发布的 2.3 中的 Spark API for Scala.0.我一直坚信 Scala 的 Spark API 始终是功能最丰富的,您的问题帮助我了解到在 2.3.0 (!)

回到你的问题,你实际上可以定义JSON或DDL格式的基于字符串的模式.

手动编写 JSON 可能有点麻烦,因此我会采用不同的方法(鉴于我是 Scala 开发人员,这相当容易).

让我们首先使用 Spark API for Scala 定义架构.

import org.apache.spark.sql.types._val addressSchema = new StructType().add($"city".string).add($"state".string).add($"zip".string)val schema = new StructType().add($"firstName".string).add($"lastName".string).add($"email".string).add($"addresses".array(addressesSchema))标度>schema.printTreeString根|-- firstName: string (nullable = true)|-- lastName: string (nullable = true)|-- 电子邮件:字符串(可为空 = 真)|-- 地址:数组(可为空 = 真)||-- 元素: struct (containsNull = true)|||-- city: string (nullable = true)|||-- 状态:字符串(可为空 = 真)|||-- zip: 字符串 (nullable = true)

这似乎符合您的架构,不是吗?

使用 json 方法将架构转换为 JSON 编码的字符串变得轻而易举.

val schemaAsJson = schema.json

schemaAsJson 正是您的 JSON 字符串,它看起来很漂亮……嗯……很复杂.出于显示目的,我宁愿使用 prettyJson 方法.

scala>println(schema.prettyJson){类型":结构",字段":[{"name" : "firstName",类型":字符串",可空":真,元数据":{}}, {"name" : "lastName",类型":字符串",可空":真,元数据":{}}, {"name" : "电子邮件",类型":字符串",可为空":真,元数据":{}}, {"name" : "地址",类型" : {类型":数组",元素类型":{类型":结构",字段":[{"name": "城市",类型":字符串",可空":真,元数据":{}}, {名称":状态",类型":字符串",可空":真,元数据":{}}, {"name" : "zip",类型":字符串",可空":真,元数据":{}]]},"containsNull" : 真},可空":真,元数据":{}]]}

这是您的 JSON 模式.

您可以使用 DataType 并验证"JSON 字符串(使用 DataType.fromJson Spark 在 from_json 的幕后使用).

import org.apache.spark.sql.types.DataTypeval dt = DataType.fromJson(schemaAsJson)标度>打印(dt.sql)STRUCT<`firstName`:STRING,`lastName`:STRING,`email`:STRING,`addresses`:ARRAY>

一切看起来都很好.介意我是否使用示例数据集进行检查?

val rawJsons = Seq("""{"firstName" : "Jacek","lastName": "拉斯科夫斯基","email" : "jacek@japila.pl",地址":[{"city" : "华沙",状态":不适用",邮编":02-791"}]}""").toDF("rawjson")val 人 = rawJsons.select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json").select("json.*")//<-- 展平结构域.withColumn("address",explode($"addresses"))//<-- 展开数组字段.drop("addresses")//<-- 不再需要.select("firstName", "lastName", "email", "address.*")//<-- 展平结构域标度>人秀+---------+---------+---------------+------+-----+------+|名字|姓氏|电子邮件|城市|州|拉链|+---------+---------+---------------+------+-----+------+|Jacek|Laskowski|jacek@japila.pl|华沙|不适用|02-791|+---------+---------+---------------+------+-----+------+

I'm reading a stream from Kafka, and I convert the value from Kafka ( which is JSON ) in to Structure.

from_json has a variant that takes a schema of type String, but I could not find a sample. Please advise what is wrong in the below code.

Error

Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
extraneous input '(' expecting {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT',

== SQL ==
STRUCT ( `firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY ( STRUCT ( `city`: STRING, `state`: STRING, `zip`: STRING )  )  ) 
-------^^^

at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)

Program

public static void main(String[] args) throws AnalysisException {
    String master = "local[*]";
    String brokers = "quickstart:9092";
    String topics = "simple_topic_6";

    SparkSession sparkSession = SparkSession
            .builder().appName(EmployeeSchemaLoader.class.getName())
            .master(master).getOrCreate();

   String employeeSchema = "STRUCT ( firstName: STRING, lastName: STRING, email: STRING, " +
            "addresses: ARRAY ( STRUCT ( city: STRING, state: STRING, zip: STRING )  )  ) ";

    SparkContext context = sparkSession.sparkContext();
    context.setLogLevel("ERROR");
    SQLContext sqlCtx = sparkSession.sqlContext();

    Dataset<Row> employeeDataset = sparkSession.readStream().
            format("kafka").
            option("kafka.bootstrap.servers", brokers)
            .option("subscribe", topics).load();
    employeeDataset.printSchema();
    employeeDataset = employeeDataset.withColumn("strValue", employeeDataset.col("value").cast("string"));
    employeeDataset = employeeDataset.withColumn("employeeRecord",
            functions.from_json(employeeDataset.col("strValue"),employeeSchema, new HashMap<>()));

    employeeDataset.printSchema();
    employeeDataset.createOrReplaceTempView("employeeView");

    sparkSession.catalog().listTables().show();

    sqlCtx.sql("select * from employeeView").show();
}

解决方案

Your question helped me to find that the variant of from_json with String-based schema was only available in Java and has recently been added to Spark API for Scala in the upcoming 2.3.0. I've so long lived with the strong belief that Spark API for Scala was always the most feature-rich and your question helped me to learn it should not have been so before the change in 2.3.0 (!)

Back to your question, you can define the string-based schema in JSON or DDL format actually.

Writing JSON by hand may be a bit cumbersome and so I'd take a different approach (that given I'm a Scala developer is fairly easy).

Let's first define the schema using Spark API for Scala.

import org.apache.spark.sql.types._
val addressesSchema = new StructType()
  .add($"city".string)
  .add($"state".string)
  .add($"zip".string)
val schema = new StructType()
  .add($"firstName".string)
  .add($"lastName".string)
  .add($"email".string)
  .add($"addresses".array(addressesSchema))
scala> schema.printTreeString
root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- email: string (nullable = true)
 |-- addresses: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |    |-- zip: string (nullable = true)

That seems to match your schema, doesn't it?

With that convert the schema to a JSON-encoded string was a breeze with json method.

val schemaAsJson = schema.json

schemaAsJson is exactly your JSON string which looks pretty...hmmm...complex. For the display purposes I'd rather use prettyJson method.

scala> println(schema.prettyJson)
{
  "type" : "struct",
  "fields" : [ {
    "name" : "firstName",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "lastName",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "email",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "addresses",
    "type" : {
      "type" : "array",
      "elementType" : {
        "type" : "struct",
        "fields" : [ {
          "name" : "city",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "state",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "zip",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        } ]
      },
      "containsNull" : true
    },
    "nullable" : true,
    "metadata" : { }
  } ]
}

That's your schema in JSON.

You can use DataType and "validate" the JSON string (using DataType.fromJson that Spark uses under the covers for from_json).

import org.apache.spark.sql.types.DataType
val dt = DataType.fromJson(schemaAsJson)
scala> println(dt.sql)
STRUCT<`firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY<STRUCT<`city`: STRING, `state`: STRING, `zip`: STRING>>>

All seems fine. Mind if I'm checking this out with a sample dataset?

val rawJsons = Seq("""
  {
    "firstName" : "Jacek",
    "lastName" : "Laskowski",
    "email" : "jacek@japila.pl",
    "addresses" : [
      {
        "city" : "Warsaw",
        "state" : "N/A",
        "zip" : "02-791"
      }
    ]
  }
""").toDF("rawjson")
val people = rawJsons
  .select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json")
  .select("json.*") // <-- flatten the struct field
  .withColumn("address", explode($"addresses")) // <-- explode the array field
  .drop("addresses")  // <-- no longer needed
  .select("firstName", "lastName", "email", "address.*") // <-- flatten the struct field
scala> people.show
+---------+---------+---------------+------+-----+------+
|firstName| lastName|          email|  city|state|   zip|
+---------+---------+---------------+------+-----+------+
|    Jacek|Laskowski|jacek@japila.pl|Warsaw|  N/A|02-791|
+---------+---------+---------------+------+-----+------+

这篇关于如何使用 from_json 与模式作为字符串(即 JSON 编码模式)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆