Spark:如何从 Spark 数据框行解析和转换 json 字符串 [英] Spark: How to parse and transform json string from spark data frame rows

查看:61
本文介绍了Spark:如何从 Spark 数据框行解析和转换 json 字符串的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何从pyspark中的spark数据帧行解析和转换json字符串?

我正在寻求如何解析的帮助:

  • json 字符串到 json 结构 output 1
  • 将json字符串转换为a、b和id列output 2

背景:我通过 API json 字符串获取大量行(jstr1jstr2、...),这些字符串被保存到 spark df.我可以分别读取每一行的模式,但这不是解决方案,因为它非常慢,因为模式有大量的行.每个 jstr 具有相同的架构,列/键 a 和 b 保持不变,只是 id 和列中的值发生变化.

使用 MapType 架构的 blackbishop 解决方案就像一个魅力 schema = "map>>"

问题扩展到:如何从 pyspark 中的火花数据帧行转换具有多个键的 JSON 字符串?

from pyspark.sql import Rowjstr1 = '{id_1":[{a":1,b":2},{a":3,b":4}]}'jstr2 = '{id_2":[{a":5,b":6},{a":7,b":8}]}'df = sqlContext.createDataFrame([Row(json=jstr1),Row(json=jstr2)])schema = F.schema_of_json(df.select(F.col("json")).take(1)[0].json)df2 = df.withColumn('json', F.from_json(F.col('json'), schema))df2.show()

当前输出:

+--------------------+|json|+--------------------+|[[[[1, 2], [3, 4]]] ||[]|+--------------------+

所需的输出 1:

+--------------------+-------+|json |身份证 |+--------------------+-------+|[[[[1, 2], [3, 4]]] |id_1 ||[[[[5, 6], [7, 8]]] |id_2 |+--------------------+-------+

所需的输出 2:

+---------+----------+-------+||乙 |身份证 |+--------------------+-------+|1 |2 |id_1 ||3 |4 |id_1 ||5 |6 |id_2 ||7 |8 |id_2 |+---------+----------+-------+

解决方案

由于您只使用了与第二行不同的第一行的架构,因此您将获得第二行的空值.您可以将 JSON 解析为 MapType,其中键的类型为字符串,而值的类型为结构数组:

schema = "map>>";df = df.withColumn('json', F.from_json(F.col('json'), schema))df.printSchema()#根# |-- json: map (nullable = true)# ||-- 键:字符串# ||-- 值:数组(valueContainsNull = true)# |||-- 元素: struct (containsNull = true)# ||||-- a: 整数(可为空 = 真)# ||||-- b:整数(可为空 = 真)

然后,通过一些简单的转换,您可以获得预期的输出:

  • id 列代表地图中的键,您可以通过 map_keys 函数获取它
  • 结构 表示您使用 map_values 函数获得的值

output1 = df.withColumn("id", F.map_keys("json").getItem(0)) \.withColumn("json", F.map_values("json").getItem(0))output1.show(截断=假)# +----------------+----+# |json |id |# +----------------+----+# |[[1, 2], [3, 4]]|id_1|# |[[5, 6], [7, 8]]|id_2|# +----------------+----+output2 = output1.withColumn("attr", F.explode("json")) \.select("id", "attr.*")output2.show(截断=假)# +----+---+---+# |id |a |b |# +----+---+---+# |id_1|1 |2 |# |id_1|3 |4 |# |id_2|5 |6 |# |id_2|7 |8 |# +----+---+---+

How to parse and transform json string from spark dataframe rows in pyspark?

I'm looking for help how to parse:

  • json string to json struct output 1
  • transform json string to columns a, b and id output 2

Background: I get via API json strings with a large number of rows (jstr1, jstr2, ...), which are saved to spark df. I can read schema for each row separately, but this is not the solution as it is very slow as schema has a large number of rows. Each jstr has the same schema, columns/keys a and b stays the same, just id and values in columns change.

EDIT: blackbishop solution to use MapType schema works like a charm schema = "map<string, array<struct<a:int,b:int>>>"

Question was extended to: How to transform JSON string with multiple keys, from spark data frame rows in pyspark?

from pyspark.sql import Row
jstr1 = '{"id_1": [{"a": 1, "b": 2}, {"a": 3, "b": 4}]}'
jstr2 = '{"id_2": [{"a": 5, "b": 6}, {"a": 7, "b": 8}]}'
    
df = sqlContext.createDataFrame([Row(json=jstr1),Row(json=jstr2)])
    
schema = F.schema_of_json(df.select(F.col("json")).take(1)[0].json)
df2 = df.withColumn('json', F.from_json(F.col('json'), schema))
df2.show()

Current output:

+--------------------+
|                json|
+--------------------+
|[[[1, 2], [3, 4]]]  |
|                  []|
+--------------------+

Required output 1:

+--------------------+-------+
|         json      |   id   |
+--------------------+-------+
|[[[1, 2], [3, 4]]] |   id_1 |
|[[[5, 6], [7, 8]]] |   id_2 |
+--------------------+-------+ 

Required output 2:

+---------+----------+-------+
|    a    |     b    |   id  |
+--------------------+-------+
|    1    |    2     |  id_1 |
|    3    |    4     |  id_1 |
|    5    |    6     |  id_2 |
|    7    |    8     |  id_2 |
+---------+----------+-------+
 

解决方案

You're getting null for the second row because you're using only the schema of the first row which is different from the second one. You can parse the JSON to a MapType instead, where the keys are of type string and values of type array of structs :

schema = "map<string, array<struct<a:int,b:int>>>"

df = df.withColumn('json', F.from_json(F.col('json'), schema))

df.printSchema()
#root
# |-- json: map (nullable = true)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = true)
# |    |    |-- element: struct (containsNull = true)
# |    |    |    |-- a: integer (nullable = true)
# |    |    |    |-- b: integer (nullable = true)

Then, with some simple transformations, you get the expected outputs:

  • The id column represents the key in the map, you get it with map_keys function
  • The structs <a:int, b:int> represents the values that you get using map_values function

output1 = df.withColumn("id", F.map_keys("json").getItem(0)) \
            .withColumn("json", F.map_values("json").getItem(0))

output1.show(truncate=False)

# +----------------+----+
# |json            |id  |
# +----------------+----+
# |[[1, 2], [3, 4]]|id_1|
# |[[5, 6], [7, 8]]|id_2|
# +----------------+----+

output2 = output1.withColumn("attr", F.explode("json")) \
    .select("id", "attr.*")

output2.show(truncate=False)

# +----+---+---+
# |id  |a  |b  |
# +----+---+---+
# |id_1|1  |2  |
# |id_1|3  |4  |
# |id_2|5  |6  |
# |id_2|7  |8  |
# +----+---+---+

这篇关于Spark:如何从 Spark 数据框行解析和转换 json 字符串的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆