如何重命名由pyspark生成的JSON? [英] How to rename my JSON generated by pyspark?

查看:94
本文介绍了如何重命名由pyspark生成的JSON?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我用

dataframe.coalesce(1).write.format('json')

在pyspark上,我无法更改分区中文件的名称

on pyspark im not able to change the name of file in the partition

我正在这样写我的JSON:

Im writing my JSON like that:

dataframe.coalesce(1).write.format('json').mode('overwrite').save('path')

但是我无法更改分区中文件的名称

but im not able to change the name of file in the partition

我想要这样的路径:

/folder/my_name.json

/folder/my_name.json

其中"my_name.json"是一个json文件

where 'my_name.json' is a json file

推荐答案

在火花 we can't control name of the file 中写入目录.

In spark we can't control name of the file written to the directory.

首先将数据写入 HDFS directory ,然后要更改文件名,我们需要使用 HDFS api .

First write the data to the HDFS directory then For changing the name of file we need to use HDFS api.

Example:

Example:

In Pyspark:

In Pyspark:

l=[("a",1)]
ll=["id","sa"]
df=spark.createDataFrame(l,ll)

hdfs_dir = "/folder/" #your hdfs directory
new_filename="my_name.json" #new filename

df.coalesce(1).write.format("json").mode("overwrite").save(hdfs_dir)

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())

#list files in the directory

list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dir))

#filter name of the file starts with part-

file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]

#rename the file

fs.rename(Path(hdfs_dir+''+file_name),Path(hdfs_dir+''+new_filename))

如果要删除目录中的 success files ,请使用fs.delete删除_Success文件.

In case if you want to delete success files in the directory use fs.delete to delete _Success files.

In Scala:

In Scala:

val df=Seq(("a",1)).toDF("id","sa")
df.show(false)

import org.apache.hadoop.fs._

val hdfs_dir = "/folder/"
val new_filename="new_json.json"

df.coalesce(1).write.mode("overwrite").format("json").save(hdfs_dir)

val fs=FileSystem.get(sc.hadoopConfiguration)
val f=fs.globStatus(new Path(s"${hdfs_dir}" + "*")).filter(x => x.getPath.getName.toString.startsWith("part-")).map(x => x.getPath.getName).mkString

fs.rename(new Path(s"${hdfs_dir}${f}"),new Path(s"${hdfs_dir}${new_filename}"))

fs.delete(new Path(s"${hdfs_dir}" + "_SUCCESS"))

这篇关于如何重命名由pyspark生成的JSON?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆