Spark 数据帧检查点清理 [英] Spark dataframe checkpoint cleanup

查看:21
本文介绍了Spark 数据帧检查点清理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 spark 中有一个数据帧,其中已加载来自 Hive 的整个分区,我需要在对数据进行一些修改后打破沿袭以覆盖同一分区.然而,当火花作业完成后,我留下了来自 HDFS 检查点的数据.为什么 Spark 不能自行清理或我遗漏了什么?

spark.sparkContext.setCheckpointDir("/home/user/checkpoint/")spark.conf.set("spark.sql.sources.partitionOverwriteMode", "动态")val df = spark.table("db.my_table").filter(col("partition").equal(2))//... 转换到数据框val checkpointDf = df.checkpoint()checkpointDf.write.format("parquet").mode(SaveMode.Overwrite).insertInto("db.my_table")

此后,我在 HDFS 上有了这个文件:

/home/user/checkpoint/214797f2-ce2e-4962-973d-8f215e5d5dd8/rdd-23/part-00000

每次运行 spark 作业时,我都会得到一个新目录,其中包含一个新的唯一 ID,其中包含数据帧中每个 RDD 的文件.

解决方案

Spark 具有用于检查点文件清理的隐式机制.

在 spark-defaults.conf 中添加这个属性.

spark.cleaner.referenceTracking.cleanCheckpoints true #默认为false

您可以在 Spark 官方配置页面中找到更多关于 Spark 配置的信息>

如果你想从 HDFS 中删除检查点目录,你可以用 Python 删除它,在你的脚本末尾你可以使用这个命令 rmtree.

此属性 spark.cleaner.referenceTracking.cleanCheckpointstrue,允许清理器删除检查点目录中的旧检查点文件.

I have a dataframe in spark where an entire partition from Hive has been loaded and i need to break the lineage to overwrite the same partition after some modifications to the data. However, when the spark job is done i am left with the data from the checkpoint on the HDFS. Why do Spark not clean this up by itself or is there something i am missing?

spark.sparkContext.setCheckpointDir("/home/user/checkpoint/")
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

val df = spark.table("db.my_table").filter(col("partition").equal(2))

// ... transformations to the dataframe

val checkpointDf = df.checkpoint()
checkpointDf.write.format("parquet").mode(SaveMode.Overwrite).insertInto("db.my_table")

After this i have this file on HDFS:

/home/user/checkpoint/214797f2-ce2e-4962-973d-8f215e5d5dd8/rdd-23/part-00000

And for each time i run the spark job i just get a new directory with a new unique id containing files for each RDD that has been in the dataframes.

解决方案

Spark has implicit mechanism for checkpoint files cleaning.

Add this property in spark-defaults.conf.

spark.cleaner.referenceTracking.cleanCheckpoints  true #Default is false

You can find more about Spark configuration in Spark official configuration page

If you want to remove the checkpoint directory from HDFS you can remove it with Python, in the end of your script you could use this command rmtree.

This property spark.cleaner.referenceTracking.cleanCheckpoints as true, allows to cleaner to remove old checkpoint files inside the checkpoint directory.

这篇关于Spark 数据帧检查点清理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆