如何使用新列对Spark dataFrame中的String字段进行JSON转义 [英] How to JSON-escape a String field in Spark dataFrame with new column
问题描述
如何通过DataFrame以JSON格式编写新列.我尝试了几种方法,但它会将数据写为JSON逸出的String字段.目前其写作为 {"test":{"id":1,"name":"name","problem_field":"{\" x \:100,\" y \:200}"}} 代码>
How to write a new column with JSON format through DataFrame. I tried several approaches but it's writing the data as JSON-escaped String field.
Currently its writing as
{"test":{"id":1,"name":"name","problem_field": "{\"x\":100,\"y\":200}"}}
相反,我希望它像 {"test":{"id":1,"name":"name","problem_field":{"x":100,"y":200}}}
problem_field
是根据从其他字段读取的值创建的新列,例如:
problem_field
is a new column that is being created based on the values read from other fields as:
val dataFrame = oldDF.withColumn("problem_field", s)
我尝试了以下方法
-
dataFrame.write.json(<< outputPath>>)
-
dataFrame.toJSON.map(value => value.replace("\\",").replace("{\" value \:\",").replace("} \}",}")).write.json(<< outputPath>>)
也尝试过转换为 DataSet
,但是没有运气.任何指针都将不胜感激.
Tried converting to DataSet
as well but no luck. Any pointers are greatly appreciated.
I have already tried the logic mentioned here: How to let Spark parse a JSON-escaped String field as a JSON Object to infer the proper structure in DataFrames?
推荐答案
对于初学者,您的示例数据在"y \":200
后带有多余的逗号,这将阻止对其进行解析.是无效的JSON.
For starters, your example data has an extraneous comma after "y\":200
which will prevent it from being parsed as it is not valid JSON.
从那里,假设您知道架构,则可以使用 from_json
来解析该字段.在此示例中,我将分别解析该字段以首先获取架构:
From there, you can use from_json
to parse the field, assuming you know the schema. In this example, I'm parsing the field separately to first get the schema:
scala> val json = spark.read.json(Seq("""{"test":{"id":1,"name":"name","problem_field": "{\"x\":100,\"y\":200}"}}""").toDS)
json: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]
scala> json.printSchema
root
|-- test: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- name: string (nullable = true)
| |-- problem_field: string (nullable = true)
scala> val problem_field = spark.read.json(json.select($"test.problem_field").map{
case org.apache.spark.sql.Row(x : String) => x
})
problem_field: org.apache.spark.sql.DataFrame = [x: bigint, y: bigint]
scala> problem_field.printSchema
root
|-- x: long (nullable = true)
|-- y: long (nullable = true)
scala> val fixed = json.withColumn("test", struct($"test.id", $"test.name", from_json($"test.problem_field", problem_field.schema).as("problem_field")))
fixed: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]
scala> fixed.printSchema
root
|-- test: struct (nullable = false)
| |-- id: long (nullable = true)
| |-- name: string (nullable = true)
| |-- problem_field: struct (nullable = true)
| | |-- x: long (nullable = true)
| | |-- y: long (nullable = true)
如果 problem_field
内容的架构在行之间不一致,则此解决方案仍将有效,但可能不是处理事物的最佳方式,因为它将产生稀疏的Dataframe,其中每一行包含每一行 problem_field
中遇到的字段.例如:
If the schema of problem_field
s contents is inconsistent between rows, this solution will still work but may not be an optimal way of handling things, as it will produce a sparse Dataframe where each row contains every field encountered in problem_field
. For example:
scala> val json = spark.read.json(Seq("""{"test":{"id":1,"name":"name","problem_field": "{\"x\":100,\"y\":200}"}}""", """{"test":{"id":1,"name":"name","problem_field": "{\"a\":10,\"b\":20}"}}""").toDS)
json: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]
scala> val problem_field = spark.read.json(json.select($"test.problem_field").map{case org.apache.spark.sql.Row(x : String) => x})
problem_field: org.apache.spark.sql.DataFrame = [a: bigint, b: bigint ... 2 more fields]
scala> problem_field.printSchema
root
|-- a: long (nullable = true)
|-- b: long (nullable = true)
|-- x: long (nullable = true)
|-- y: long (nullable = true)
scala> val fixed = json.withColumn("test", struct($"test.id", $"test.name", from_json($"test.problem_field", problem_field.schema).as("problem_field")))
fixed: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]
scala> fixed.printSchema
root
|-- test: struct (nullable = false)
| |-- id: long (nullable = true)
| |-- name: string (nullable = true)
| |-- problem_field: struct (nullable = true)
| | |-- a: long (nullable = true)
| | |-- b: long (nullable = true)
| | |-- x: long (nullable = true)
| | |-- y: long (nullable = true)
scala> fixed.select($"test.problem_field.*").show
+----+----+----+----+
| a| b| x| y|
+----+----+----+----+
|null|null| 100| 200|
| 10| 20|null|null|
+----+----+----+----+
在数百,数千或数百万行的过程中,您可以看到这将如何带来问题.
Over the course of hundreds, thousands, or millions of rows, you can see how this would present a problem.
这篇关于如何使用新列对Spark dataFrame中的String字段进行JSON转义的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!