如何使用新列对 Spark dataFrame 中的 String 字段进行 JSON 转义 [英] How to JSON-escape a String field in Spark dataFrame with new column

查看:43
本文介绍了如何使用新列对 Spark dataFrame 中的 String 字段进行 JSON 转义的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何通过DataFrame写一个JSON格式的新列.我尝试了几种方法,但它将数据写入为 JSON 转义字符串字段.目前它的写作是{"test":{"id":1,"name":"name","problem_field": "{\"x\":100,\"y\":200}"}}

How to write a new column with JSON format through DataFrame. I tried several approaches but it's writing the data as JSON-escaped String field. Currently its writing as {"test":{"id":1,"name":"name","problem_field": "{\"x\":100,\"y\":200}"}}

相反,我希望它像{"test":{"id":1,"name":"name","problem_field": {"x":100,"y":200}}}

problem_field 是基于从其他字段读取的值创建的新列:

problem_field is a new column that is being created based on the values read from other fields as:

val dataFrame = oldDF.withColumn("problem_field", s)

我尝试了以下方法

  1. dataFrame.write.json(<<outputPath>>)
  2. dataFrame.toJSON.map(value => value.replace("\\", "").replace("{\"value\":\"", "").replace("}\"}", "}")).write.json(<<outputPath>>)

也尝试转换为 DataSet 但没有成功.任何指针都非常感谢.

Tried converting to DataSet as well but no luck. Any pointers are greatly appreciated.

我已经尝试过这里提到的逻辑:如何让 Spark 将 JSON 转义的字符串字段解析为 JSON 对象以推断数据帧中的正确结构?

I have already tried the logic mentioned here: How to let Spark parse a JSON-escaped String field as a JSON Object to infer the proper structure in DataFrames?

推荐答案

对于初学者,您的示例数据在 "y\":200 之后有一个无关的逗号,这将阻止它被解析为不是有效的 JSON.

For starters, your example data has an extraneous comma after "y\":200 which will prevent it from being parsed as it is not valid JSON.

从那里,您可以使用 from_json 来解析字段,假设您知道架构.在本例中,我分别解析字段以首先获取架构:

From there, you can use from_json to parse the field, assuming you know the schema. In this example, I'm parsing the field separately to first get the schema:

scala> val json = spark.read.json(Seq("""{"test":{"id":1,"name":"name","problem_field": "{\"x\":100,\"y\":200}"}}""").toDS)
json: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]

scala> json.printSchema
root
 |-- test: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- problem_field: string (nullable = true)


scala> val problem_field = spark.read.json(json.select($"test.problem_field").map{
case org.apache.spark.sql.Row(x : String) => x
})
problem_field: org.apache.spark.sql.DataFrame = [x: bigint, y: bigint]          

scala> problem_field.printSchema
root
 |-- x: long (nullable = true)
 |-- y: long (nullable = true)

scala> val fixed = json.withColumn("test", struct($"test.id", $"test.name", from_json($"test.problem_field", problem_field.schema).as("problem_field")))
fixed: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]

scala> fixed.printSchema
root
 |-- test: struct (nullable = false)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- problem_field: struct (nullable = true)
 |    |    |-- x: long (nullable = true)
 |    |    |-- y: long (nullable = true)

如果 problem_field 内容的模式在行之间不一致,这个解决方案仍然有效,但可能不是处理事情的最佳方式,因为它会产生一个稀疏的 Dataframe,其中每行包含每个problem_field 中遇到的字段.例如:

If the schema of problem_fields contents is inconsistent between rows, this solution will still work but may not be an optimal way of handling things, as it will produce a sparse Dataframe where each row contains every field encountered in problem_field. For example:

scala> val json = spark.read.json(Seq("""{"test":{"id":1,"name":"name","problem_field": "{\"x\":100,\"y\":200}"}}""", """{"test":{"id":1,"name":"name","problem_field": "{\"a\":10,\"b\":20}"}}""").toDS)
json: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]

scala> val problem_field = spark.read.json(json.select($"test.problem_field").map{case org.apache.spark.sql.Row(x : String) => x})
problem_field: org.apache.spark.sql.DataFrame = [a: bigint, b: bigint ... 2 more fields]

scala> problem_field.printSchema
root
 |-- a: long (nullable = true)
 |-- b: long (nullable = true)
 |-- x: long (nullable = true)
 |-- y: long (nullable = true)

scala> val fixed = json.withColumn("test", struct($"test.id", $"test.name", from_json($"test.problem_field", problem_field.schema).as("problem_field")))
fixed: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]

scala> fixed.printSchema
root
 |-- test: struct (nullable = false)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- problem_field: struct (nullable = true)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- x: long (nullable = true)
 |    |    |-- y: long (nullable = true)

scala> fixed.select($"test.problem_field.*").show
+----+----+----+----+
|   a|   b|   x|   y|
+----+----+----+----+
|null|null| 100| 200|
|  10|  20|null|null|
+----+----+----+----+

在数百、数千或数百万行的过程中,您可以看到这将如何产生问题.

Over the course of hundreds, thousands, or millions of rows, you can see how this would present a problem.

这篇关于如何使用新列对 Spark dataFrame 中的 String 字段进行 JSON 转义的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆