如何重命名由pyspark生成的JSON? [英] How to rename my JSON generated by pyspark?
问题描述
当我用
dataframe.coalesce(1).write.format('json')
在pyspark上,我无法更改分区中文件的名称
on pyspark im not able to change the name of file in the partition
我正在这样写我的JSON:
Im writing my JSON like that:
dataframe.coalesce(1).write.format('json').mode('overwrite').save('path')
但是我无法更改分区中文件的名称
but im not able to change the name of file in the partition
我想要这样的路径:
/folder/my_name.json
/folder/my_name.json
其中"my_name.json"是一个json文件
where 'my_name.json' is a json file
推荐答案
在火花 we can't control name of the file
中写入目录.
In spark we can't control name of the file
written to the directory.
首先将数据写入 HDFS directory
,然后要更改文件名,我们需要使用 HDFS api
.
First write the data to the HDFS directory
then For changing the name of file we need to use HDFS api
.
Example:
Example:
In Pyspark:
In Pyspark:
l=[("a",1)]
ll=["id","sa"]
df=spark.createDataFrame(l,ll)
hdfs_dir = "/folder/" #your hdfs directory
new_filename="my_name.json" #new filename
df.coalesce(1).write.format("json").mode("overwrite").save(hdfs_dir)
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
#list files in the directory
list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dir))
#filter name of the file starts with part-
file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]
#rename the file
fs.rename(Path(hdfs_dir+''+file_name),Path(hdfs_dir+''+new_filename))
如果要删除目录中的 success files
,请使用fs.delete
删除_Success
文件.
In case if you want to delete success files
in the directory use fs.delete
to delete _Success
files.
In Scala:
In Scala:
val df=Seq(("a",1)).toDF("id","sa")
df.show(false)
import org.apache.hadoop.fs._
val hdfs_dir = "/folder/"
val new_filename="new_json.json"
df.coalesce(1).write.mode("overwrite").format("json").save(hdfs_dir)
val fs=FileSystem.get(sc.hadoopConfiguration)
val f=fs.globStatus(new Path(s"${hdfs_dir}" + "*")).filter(x => x.getPath.getName.toString.startsWith("part-")).map(x => x.getPath.getName).mkString
fs.rename(new Path(s"${hdfs_dir}${f}"),new Path(s"${hdfs_dir}${new_filename}"))
fs.delete(new Path(s"${hdfs_dir}" + "_SUCCESS"))
这篇关于如何重命名由pyspark生成的JSON?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!