编写一个pyspark.sql.dataframe.DataFrame而不丢失信息 [英] Write a pyspark.sql.dataframe.DataFrame without losing information

查看:87
本文介绍了编写一个pyspark.sql.dataframe.DataFrame而不丢失信息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将pyspark.sql.dataframe.DataFrame保存为CSV格式(也可以是其他格式,只要它易于阅读).

到目前为止,我发现了几个保存DataFrame的示例.但是,每次我写它都会丢失信息.

数据集示例:

 #创建一个示例Pyspark DataFrame从pyspark.sql导入行员工=行(名字",姓氏",电子邮件",薪水")employee1 = Employee('A','AA','mail1',100000)employee2 = Employee('B','BB','mail2',120000)employee3 = Employee('C',None,'mail3',140000)employee4 = Employee('D','DD','mail4',160000)employee5 = Employee('E','EE','mail5',160000)department1 =行(id ='123',name ='HR')department2 =行(id ='456',name ='OPS')department3 =行(id ='789',name ='FN')department4 =行(id ='101112',name ='DEV')departmentWithEmployees1 =行(部门=部门1,雇员= [雇员1,雇员2,雇员5])departmentWithEmployees2 =行(部门=部门2,雇员= [雇员3,雇员4])departmentWithEmployees3 =行(部门=部门3,雇员= [雇员1,雇员4,雇员3])departmentWithEmployees4 =行(部门=部门4,雇员= [雇员2,雇员3])departmentWithEmployees_Seq = [departmentWithEmployees1,departmentWithEmployees2]dframe = spark.createDataFrame(departmentsWithEmployees_Seq) 

为了将此文件另存为CSV,我首先尝试

仅供参考:我正在使用Python在Databricks中工作.

因此,如何在不丢失信息的情况下写入数据(上例中的dframe)?

非常感谢!

修改为Pault添加图片,以显示csv(和标题)的格式.

Edit2 替换图片,例如csv输出:

运行Pault的代码后:

从pyspark.sql.functions中的

 导入to_jsondframe.select(* [dframe.columns中c的to_json(c).alias(c)])\.repartition(1).write.csv("junk_mycsv.csv",标头= True) 

输出不是整齐的,因为大多数列标题都是空的(由于嵌套格式?).仅复制第一行:

 部门员工(空的ColName)(空的ColName)(依此类推){\ id \:\" 123 \"\"名称\:\" HR \}" [{\ firstName \:\" A \"\" lastName \:\" AA \(.) 

解决方案

您的数据框具有以下架构:

  dframe.printSchema()#根#|-部门:struct(nullable = true)#||-id:字符串(可为空= true)#||-名称:字符串(nullable = true)#|-员工:数组(nullable = true)#||-元素:struct(containsNull = true)#|||-firstName:字符串(nullable = true)#|||-lastName:字符串(nullable = true)#|||-电子邮件:字符串(nullable = true)#|||-薪水:长(nullable = true) 

因此, department 列是具有两个命名字段的 StructType ,而 employees 列是具有四个命名字段的结构数组.似乎您想要以一种格式保存数据,该格式同时为每条记录保存 key value .

一种选择是以JSON格式而不是CSV格式写入文件:

  dframe.write.json("junk.json") 

哪个会产生以下输出:

  {部门":{"id":"123",名称":"HR"},员工":[{名字":"A",姓氏":"AA","email":"mail1",工资":100000},{"firstName":"B","lastName":"BB","email":"mail2",工资":120000},{"firstName:" E," lastName:" EE," email:" mail5,"薪水:160000}]}{部门":{"id":"456",名称":"OPS"},员工":[{名字":"C",电子邮件":"mail3",工资":140000},{"firstName":"D","lastName":"DD","email":"mail4",薪水":160000}]} 

或者如果您想将其保留为CSV格式,则可以使用

这将产生以下输出:

 "{\" id \:\" 123 \,\"名称\:\" HR \}","[{\"名字\:\" A \,\"lastName \":\"AA \",\电子邮件\":\"mail1 \",\薪水\":100000},{\"firstName \":\"B \",\"lastName \":\"BB \",\电子邮件\":\"mail2 \",\工资\":120000},{\名字\":\"E \",\姓氏":\"EE\,\" email \:\" mail5 \,\" salary \:160000}]""{\" id \:\" 456 \,\"名称\:\" OPS \}}","[{\"名字\:\" C \,\"电子邮件\:\"mail3 \",\工资\":140000},{\"firstName \":\"D \",\"lastName \":\"DD \",\"email \":\"mail4 \",\"salary \":160000}] 

请注意,双引号已转义.

I am trying to save an pyspark.sql.dataframe.DataFrame in CSV format (could also be another format, as long as it is easily readable).

So far, I found a couple of examples to save the DataFrame. However, it is losing information everytime that I write it.

Dataset example:

# Create an example Pyspark DataFrame

from pyspark.sql import Row

Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('A', 'AA', 'mail1', 100000)
employee2 = Employee('B', 'BB', 'mail2', 120000 )
employee3 = Employee('C', None, 'mail3', 140000 )
employee4 = Employee('D', 'DD', 'mail4', 160000 )
employee5 = Employee('E', 'EE', 'mail5', 160000 )

department1 = Row(id='123', name='HR')
department2 = Row(id='456', name='OPS')
department3 = Row(id='789', name='FN')
department4 = Row(id='101112', name='DEV')

departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2, employee5])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4])
departmentWithEmployees3 = Row(department=department3, employees=[employee1, employee4, employee3])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])

departmentsWithEmployees_Seq = [departmentWithEmployees1, departmentWithEmployees2]
dframe = spark.createDataFrame(departmentsWithEmployees_Seq)

In order to save this file as CSV, I firstly tried this solution:

type(dframe)
Out[]: pyspark.sql.dataframe.DataFrame
dframe.write.csv('junk_mycsv.csv')

Unfortunately, that result in this error:

org.apache.spark.sql.AnalysisException: CSV data source does not support struct<id:string,name:string> data type.; 

That is the reason why I tried another possibility, to convert the spark dataframe into a pandas dataframe, and save it then. As mentioned in this example.

pandas_df = dframe.toPandas()

Works good! However, If I show my data, it is missing data:

print(pandas_df.head())

department                                          employees
0   (123, HR)  [(A, AA, mail1, 100000), (B, BB, mail2, 120000...
1  (456, OPS)  [(C, None, mail3, 140000), (D, DD, mail4, 1600...

As you can see in the snapshot below, we are missing information. Because the data should be like this:

department              employees
0  id:123, name:HR      firstName: A, lastName: AA, email: mail1, salary: 100000

# Info is missing like 'id', 'name', 'firstName', 'lastName', 'email' etc. 
# For the complete expected example, see screenshow below. 

Just for information: I am working in Databricks, with Python.

Therefore, how can I write my data (dframe from the example above) without losing information?

Many thanks in advance!

Edit Adding a picture for Pault, to show the format of the csv (and the headers).

Edit2 Replacing the picture for example csv output:

After running Pault's code:

from pyspark.sql.functions import to_json
dframe.select(*[to_json(c).alias(c) for c in dframe.columns])\
    .repartition(1).write.csv("junk_mycsv.csv", header= True)

The output is not tidy, since most column headers are empty (due the nested format?). Only copying the first row:

department           employees              (empty ColName)     (empty ColName)   (and so on)
{\id\":\"123\"       \"name\":\"HR\"}"     [{\firstName\":\"A\"  \"lastName\":\"AA\"    (...)

解决方案

Your dataframe has the following schema:

dframe.printSchema()
#root
# |-- department: struct (nullable = true)
# |    |-- id: string (nullable = true)
# |    |-- name: string (nullable = true)
# |-- employees: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- firstName: string (nullable = true)
# |    |    |-- lastName: string (nullable = true)
# |    |    |-- email: string (nullable = true)
# |    |    |-- salary: long (nullable = true)

So the department column is a StructType with two named fields and the employees column is an array of structs with four named fields. It appears what you want is to write the data in a format that saves both the key and the value for each record.

One option is to write the file in JSON format instead of CSV:

dframe.write.json("junk.json")

Which produces the following output:

{"department":{"id":"123","name":"HR"},"employees":[{"firstName":"A","lastName":"AA","email":"mail1","salary":100000},{"firstName":"B","lastName":"BB","email":"mail2","salary":120000},{"firstName":"E","lastName":"EE","email":"mail5","salary":160000}]}
{"department":{"id":"456","name":"OPS"},"employees":[{"firstName":"C","email":"mail3","salary":140000},{"firstName":"D","lastName":"DD","email":"mail4","salary":160000}]}

Or if you wanted to keep it in CSV format, you can use to_json to convert each column to JSON before writing the CSV.

# looping over all columns
# but you can also just limit this to the columns you want to convert

from pyspark.sql.functions import to_json
dframe.select(*[to_json(c).alias(c) for c in dframe.columns])\
    .write.csv("junk_mycsv.csv")

This produces the following output:

"{\"id\":\"123\",\"name\":\"HR\"}","[{\"firstName\":\"A\",\"lastName\":\"AA\",\"email\":\"mail1\",\"salary\":100000},{\"firstName\":\"B\",\"lastName\":\"BB\",\"email\":\"mail2\",\"salary\":120000},{\"firstName\":\"E\",\"lastName\":\"EE\",\"email\":\"mail5\",\"salary\":160000}]"
"{\"id\":\"456\",\"name\":\"OPS\"}","[{\"firstName\":\"C\",\"email\":\"mail3\",\"salary\":140000},{\"firstName\":\"D\",\"lastName\":\"DD\",\"email\":\"mail4\",\"salary\":160000}]"

Note that the double-quotes are escaped.

这篇关于编写一个pyspark.sql.dataframe.DataFrame而不丢失信息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆