在pyspark sql中转换多个结构列数组 [英] Convert multiple array of structs columns in pyspark sql

查看:148
本文介绍了在pyspark sql中转换多个结构列数组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有 pyspark 数据框,其中包含多列(大约 30 列)嵌套结构,我想将其写入 csv.(结构

为了做到这一点,我想对所有结构列进行字符串化.

我在这里检查了几个答案:

所以 - 总结一下:我有一个想要写入 CSV 的 spark 数据框.我无法将其写入 CSV,因为:

'CSV数据源不支持struct,type:string,x:bigint,y:bigint>数据类型.;'

所以我想在这个数据帧上执行一些操作/可逆转换,以便我可以将它写入 CSV,然后从 CSV 中读取它并使其成为具有相同架构的 Spark 数据帧.

我该怎么做?谢谢

解决方案

As pault 已经在评论中提到了,你需要一个列表理解.这种列表理解需要一个列列表和一个将这些列转换为字符串的函数.我将使用 df.columnsto_json 但您也可以提供您自己的列名 Python 列表和自定义函数来对复杂列进行字符串化.

#this 将所有列转换为 json 字符串#并将其写入磁盘s_df.select([F.to_json(x) for x in s_df.columns]).coalesce(1).write.csv('/tmp/testcsv')

如果您不想将 to_json 应用于所有列,您可以简单地修改它:

list4tojson = ['2_complex_key', '3_complex_key']s_df.select('1_complex_key', *[F.to_json(x) for x in list4tojson]).coalesce(1).write.csv('/tmp/testcsv')

您可以使用 from_json:

df = spark.read.csv('/tmp/testcsv')df.printSchema()#根# |-- _c0: 字符串 (nullable = true)# |-- _c1: 字符串 (nullable = true)# |-- _c2: 字符串 (nullable = true)#干扰模式json_schema = spark.read.json(df.rdd.map(lambda row: row._c0)).schemadf.select([F.from_json(x, json_schema) for x in df.columns] ).printSchema()#根# |-- jsontostructs(_c0): struct (nullable = true)# ||-- s: struct (nullable = true)# |||-- n1: 布尔值 (nullable = true)# |||-- n2: 布尔值 (nullable = true)# |||-- n3: 布尔值 (nullable = true)# ||-- 类型:字符串(可为空 = 真)# ||-- x: 长(可为空 = 真)# ||-- y: 长(可为空 = 真)# |-- jsontostructs(_c1): struct (nullable = true)# ||-- s: struct (nullable = true)# |||-- n1: 布尔值 (nullable = true)# |||-- n2: 布尔值 (nullable = true)# |||-- n3: 布尔值 (nullable = true)# ||-- 类型:字符串(可为空 = 真)# ||-- x: 长(可为空 = 真)# ||-- y: 长(可为空 = 真)# |-- jsontostructs(_c2): struct (nullable = true)# ||-- s: struct (nullable = true)# |||-- n1: 布尔值 (nullable = true)# |||-- n2: 布尔值 (nullable = true)# |||-- n3: 布尔值 (nullable = true)# ||-- 类型:字符串(可为空 = 真)# ||-- x: 长(可为空 = 真)# ||-- y: 长(可为空 = 真)

如果您只想以可读格式存储数据,则可以通过直接将其写入 json 来避免上述所有代码:

s_df.coalesce(1).write.json('/tmp/testjson')df = spark.read.json('/tmp/testjson')

I have pyspark dataframe with multiple columns (Around 30) of nested structs, that I want to write into csv. (struct

In order to do it, I want to stringify all of the struct columns.

I've checked several answers here:

Pyspark converting an array of struct into string

PySpark: DataFrame - Convert Struct to Array

PySpark convert struct field inside array to string

This is the structure of my dataframe (with around 30 complex keys):

root  
 |-- 1_simple_key: string (nullable = true)  
 |-- 2_simple_key: string (nullable = true)  
 |-- 3_complex_key: struct (nullable = true)  
 |    |-- n1: string (nullable = true)  
 |    |-- n2: struct (nullable = true)  
 |    |    |-- n3: boolean (nullable = true)  
 |    |    |-- n4: boolean (nullable = true)  
 |    |    |-- n5: boolean (nullable = true)  
 |    |-- n6: long (nullable = true)  
 |    |-- n7: long (nullable = true)  
 |-- 4_complex_key: struct (nullable = true)  
 |    |-- n1: string (nullable = true)  
 |    |-- n2: struct (nullable = true)  
 |    |    |-- n3: boolean (nullable = true)  
 |    |    |-- n4: boolean (nullable = true)  
 |    |    |-- n5: boolean (nullable = true)  
 |    |-- n6: long (nullable = true)  
 |    |-- n7: long (nullable = true)  
 |-- 5_complex_key: struct (nullable = true)  
 |    |-- n1: string (nullable = true)  
 |    |-- n2: struct (nullable = true)  
 |    |    |-- n3: boolean (nullable = true)  
 |    |    |-- n4: boolean (nullable = true)  
 |    |    |-- n5: boolean (nullable = true)  
 |    |-- n6: long (nullable = true)  
 |    |-- n7: long (nullable = true)  

The proposed solutions are for a single column, and I can't adopt it to multiple columns.

I want to do something of this type:
1. For each struct_column:
2. col = stringify(struct_column)

I don't mind creating an additional dataframe for it. I just need to make it ready for csv writing.

Minimal reproducible example:

from pyspark.sql import Row
d = d = {'1_complex_key': {0: Row(type='1_complex_key', s=Row(n1=False, n2=False, n3=True), x=954, y=238), 1: Row(type='1_complex_key', s=Row(n1=False, n2=False, n3=True), x=956, y=250), 2: Row(type='1_complex_key', s=Row(n1=True, n2=False, n3=False), x=886, y=269)}, '2_complex_key': {0: Row(type='2_complex_key', s=Row(n1=False, n2=False, n3=True), x=901, y=235), 1: Row(type='2_complex_key', s=Row(n1=False, n2=False, n3=True), x=905, y=249), 2: Row(type='2_complex_key', s=Row(n1=False, n2=False, n3=True), x=868, y=270)}, '3_complex_key': {0: Row(type='3_complex_key', s=Row(n1=True, n2=False, n3=False), x=925, y=197), 1: Row(type='3_complex_key', s=Row(n1=False, n2=False, n3=True), x=928, y=206), 2: Row(type='3_complex_key', s=Row(n1=False, n2=False, n3=True), x=883, y=236)}}
df = pd.DataFrame.from_dict(d)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
s_df = spark.createDataFrame(df)
s_df.printSchema()
s_df.write.csv('it_doesnt_write.csv')

So - to summarize: I have a spark dataframe that I want to write to CSV. I can't write it to CSV because:

'CSV data source does not support struct<s:struct<n1:boolean,n2:boolean,n3:boolean>,type:string,x:bigint,y:bigint> data type.;'

So I want to perform some actions / reversible transformations on this dataframe so that I can write it to CSV, and later read it from the CSV and make it a spark dataframe with the same schema.

How can I do it? Thanks

解决方案

As pault has already mentioned in the comments, you need a list comprehension. Such a list comprehension requires a list of columns and a functions which converts this columns to strings. I will use df.columns and to_json but you can also provide your own python list of column names and a custom function to stringfy your complex columns.

#this converts all columns to json strings
#and writes it as to disk
s_df.select([F.to_json(x) for x in s_df.columns]).coalesce(1).write.csv('/tmp/testcsv')

In case you don't want to apply to_json to all columns, you can simply modify it like that:

list4tojson = ['2_complex_key', '3_complex_key']
s_df.select('1_complex_key', *[F.to_json(x) for x in list4tojson]).coalesce(1).write.csv('/tmp/testcsv')

You can restore dataframe with from_json:

df = spark.read.csv('/tmp/testcsv')
df.printSchema()
#root
# |-- _c0: string (nullable = true)
# |-- _c1: string (nullable = true)
# |-- _c2: string (nullable = true)

#interfering the schema
json_schema = spark.read.json(df.rdd.map(lambda row: row._c0)).schema

df.select([F.from_json(x, json_schema) for x in df.columns] ).printSchema()
#root
# |-- jsontostructs(_c0): struct (nullable = true)
# |    |-- s: struct (nullable = true)
# |    |    |-- n1: boolean (nullable = true)
# |    |    |-- n2: boolean (nullable = true)
# |    |    |-- n3: boolean (nullable = true)
# |    |-- type: string (nullable = true)
# |    |-- x: long (nullable = true)
# |    |-- y: long (nullable = true)
# |-- jsontostructs(_c1): struct (nullable = true)
# |    |-- s: struct (nullable = true)
# |    |    |-- n1: boolean (nullable = true)
# |    |    |-- n2: boolean (nullable = true)
# |    |    |-- n3: boolean (nullable = true)
# |    |-- type: string (nullable = true)
# |    |-- x: long (nullable = true)
# |    |-- y: long (nullable = true)
# |-- jsontostructs(_c2): struct (nullable = true)
# |    |-- s: struct (nullable = true)
# |    |    |-- n1: boolean (nullable = true)
# |    |    |-- n2: boolean (nullable = true)
# |    |    |-- n3: boolean (nullable = true)
# |    |-- type: string (nullable = true)
# |    |-- x: long (nullable = true)
# |    |-- y: long (nullable = true)

In case you just want to store your data in a readable format, you can avoid all of the above code by writing it to json directly:

s_df.coalesce(1).write.json('/tmp/testjson')

df = spark.read.json('/tmp/testjson')

这篇关于在pyspark sql中转换多个结构列数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆