如何从pyspark中的spark数据帧行中解析和转换json字符串 [英] How to parse and transform json string from spark data frame rows in pyspark

查看:108
本文介绍了如何从pyspark中的spark数据帧行中解析和转换json字符串的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何从pyspark中的spark数据帧行中解析和转换json字符串?

我正在寻找如何解析的帮助:

  • 将json字符串转换为json结构输出1
  • 将json字符串转换为a,b和id列 output 2

背景:我通过API json字符串获得了很多行( jstr1 jstr2 ,...),这些行被保存到spark df.我可以分别读取每一行的架构,但这不是解决方案,因为架构非常多,因此速度很慢.每个 jstr 具有相同的架构,列/键a和b保持相同,只是 id 和列中的值会更改.

使用MapType模式的blackbishop解决方案的工作原理类似于超级键 schema ="map< string,array< struct< a:int,b:int>>>>>""

问题扩展到:如何从pyspark中的spark数据帧行转换具有多个键的JSON字符串?

来自pyspark.sql的

 导入行jstr1 ='{"id_1":[{"a":1,'b':2},{"a":3,"b":4}]}'jstr2 ='{"id_2":[{"a":5,5,"b":6},{"a":7,'b':8}]}'df = sqlContext.createDataFrame([Row(json = jstr1),Row(json = jstr2)])模式= F.schema_of_json(df.select(F.col("json"))).take(1)[0] .json)df2 = df.withColumn('json',F.from_json(F.col('json'),schema))df2.show() 

当前输出:

  + -------------------- +|json |+ -------------------- +| [[[[1,2],[3,4]]] | ||[] |+ -------------------- + 

必填输出1:

  + -------------------- + ------- +|json |id |+ -------------------- + ------- +| [[[[1,2],[3,4]]] | |id_1 || [[[[5,6],[7,8]]] | |id_2 |+ -------------------- + ------- + 

必填输出2:

  + --------- + ---------- + ------- +|一个|b |id |+ -------------------- + ------- +|1 |2 |id_1 ||3 |4 |id_1 ||5 |6 |id_2 ||7 |8 |id_2 |+ --------- + ---------- + ------- + 

解决方案

由于第二行仅使用与第二行不同的模式,因此第二行将为空.您可以改为将JSON解析为MapType,其中键的类型为string,而结构的类型为array的值:

  schema ="map< string,array< struct< a:int,b:int>>""df = df.withColumn('json',F.from_json(F.col('json'),schema))df.printSchema()#根#|-json:map(nullable = true)#||-键:字符串#||-值:数组(valueContainsNull = true)#|||-元素:struct(containsNull = true)#||||-a:整数(可为空= true)#||||-b:整数(nullable = true) 

然后,通过一些简单的转换,您将获得预期的输出:

  • id 列表示地图中的键,您可以通过 map_keys 函数
  • 来获取它
  • 结构< a:int,b:int> 表示您使用 map_values 函数获得的值

  output1 = df.withColumn("id",F.map_keys("json").getItem(0))\.withColumn("json",F.map_values("json").getItem(0))output1.show(truncate = False)#+ ---------------- + ---- +#| json | id |#+ ---------------- + ---- +#| [[1,2],[3,4]] | id_1 |#| [[5,6],[7,8]] | id_2 |#+ ---------------- + ---- +output2 = output1.withColumn("attr",F.explode("json"))\.select("id","attr.*")output2.show(truncate = False)#+ ---- + --- + --- +#| id | a | b |#+ ---- + --- + --- +#| id_1 | 1 | 2 |#| id_1 | 3 | 4 |#| id_2 | 5 | 6 |#| id_2 | 7 | 8 |#+ ---- + --- + --- + 

How to parse and transform json string from spark dataframe rows in pyspark?

I'm looking for help how to parse:

  • json string to json struct output 1
  • transform json string to columns a, b and id output 2

Background: I get via API json strings with a large number of rows (jstr1, jstr2, ...), which are saved to spark df. I can read schema for each row separately, but this is not the solution as it is very slow as schema has a large number of rows. Each jstr has the same schema, columns/keys a and b stays the same, just id and values in columns change.

EDIT: blackbishop solution to use MapType schema works like a charm schema = "map<string, array<struct<a:int,b:int>>>"

Question was extended to: How to transform JSON string with multiple keys, from spark data frame rows in pyspark?

from pyspark.sql import Row
jstr1 = '{"id_1": [{"a": 1, "b": 2}, {"a": 3, "b": 4}]}'
jstr2 = '{"id_2": [{"a": 5, "b": 6}, {"a": 7, "b": 8}]}'
    
df = sqlContext.createDataFrame([Row(json=jstr1),Row(json=jstr2)])
    
schema = F.schema_of_json(df.select(F.col("json")).take(1)[0].json)
df2 = df.withColumn('json', F.from_json(F.col('json'), schema))
df2.show()

Current output:

+--------------------+
|                json|
+--------------------+
|[[[1, 2], [3, 4]]]  |
|                  []|
+--------------------+

Required output 1:

+--------------------+-------+
|         json      |   id   |
+--------------------+-------+
|[[[1, 2], [3, 4]]] |   id_1 |
|[[[5, 6], [7, 8]]] |   id_2 |
+--------------------+-------+ 

Required output 2:

+---------+----------+-------+
|    a    |     b    |   id  |
+--------------------+-------+
|    1    |    2     |  id_1 |
|    3    |    4     |  id_1 |
|    5    |    6     |  id_2 |
|    7    |    8     |  id_2 |
+---------+----------+-------+
 

解决方案

You're getting null for the second row because you're using only the schema of the first row which is different from the second one. You can parse the JSON to a MapType instead, where the keys are of type string and values of type array of structs :

schema = "map<string, array<struct<a:int,b:int>>>"

df = df.withColumn('json', F.from_json(F.col('json'), schema))

df.printSchema()
#root
# |-- json: map (nullable = true)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = true)
# |    |    |-- element: struct (containsNull = true)
# |    |    |    |-- a: integer (nullable = true)
# |    |    |    |-- b: integer (nullable = true)

Then, with some simple transformations, you get the expected outputs:

  • The id column represents the key in the map, you get it with map_keys function
  • The structs <a:int, b:int> represents the values that you get using map_values function

output1 = df.withColumn("id", F.map_keys("json").getItem(0)) \
            .withColumn("json", F.map_values("json").getItem(0))

output1.show(truncate=False)

# +----------------+----+
# |json            |id  |
# +----------------+----+
# |[[1, 2], [3, 4]]|id_1|
# |[[5, 6], [7, 8]]|id_2|
# +----------------+----+

output2 = output1.withColumn("attr", F.explode("json")) \
    .select("id", "attr.*")

output2.show(truncate=False)

# +----+---+---+
# |id  |a  |b  |
# +----+---+---+
# |id_1|1  |2  |
# |id_1|3  |4  |
# |id_2|5  |6  |
# |id_2|7  |8  |
# +----+---+---+

这篇关于如何从pyspark中的spark数据帧行中解析和转换json字符串的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆