PySpark:全面清洁检查站 [英] PySpark: fully cleaning checkpoints
问题描述
根据文档可以告诉Spark跟踪超出范围" 检查点-不再需要的检查点-并从磁盘中清除它们.
According the documentation is possible to tell Spark to keep track of "out of scope" checkpoints - those that are not needed anymore - and clean them from disk.
SparkSession.builder
...
.config("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
.getOrCreate()
显然是这样做的,但是问题是,最后一个被检查的rdds从未被删除.
Apparently it does so but the problem, however, is that the last checkpointed rdds are never deleted.
- 执行所有清理操作时是否缺少任何配置?
- 如果没有:有什么方法可以获取为特定应用程序创建的临时文件夹的名称,以便我可以通过编程方式将其删除?IE.以与获取
applicationId
相同的方式从
SparkContext
获取 0c514fb8-498c-4455-b147-aff242bd7381
推荐答案
我知道它的老问题,但是最近我正在使用 checkpoint
进行探索,并且遇到了类似的问题.希望分享调查结果.
I know its old question but recently i was exploring on checkpoint
and had similar problems. Would like to share the findings.
问题:执行所有清理操作时,我是否缺少任何配置?
Question :Is there any configuration I am missing to perform all cleanse?
设置 spark.cleaner.referenceTracking.cleanCheckpoints = true
有时会起作用,但很难依靠它.官方文件说,通过设置此属性
Setting spark.cleaner.referenceTracking.cleanCheckpoints=true
is working sometime but its hard to rely on it. official document says that by setting this property
如果引用超出范围,则清除检查点文件
clean checkpoint files if the reference is out of scope
我不知道这到底意味着什么,因为我的理解是,一旦火花会话/上下文停止,就应该清理它.
I don't know what exactly it means because my understanding is once spark session/context is stopped it should clean it.
但是 ,我找到了您以下问题的答案
However, I found a answer to your below question
如果没有:是否有任何方法可以获取临时名称为特定应用程序创建的文件夹,这样我就可以以编程方式删除它?IE.从获取0c514fb8-498c-4455-b147-aff242bd7381以相同的方式获取SparkId来获取applicationId
If there isn't: Is there any way to get the name of the temporary folder created for a particular application so I can programatically delete it? I.e. Get 0c514fb8-498c-4455-b147-aff242bd7381 from SparkContext the same way you can get the applicationId
是,我们可以得到如下所示的 checkpointed
目录:
Yes, We can get the checkpointed
directory like below:
scala:
//Set directory
scala> spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoint/")
scala> spark.sparkContext.getCheckpointDir.get
res3: String = hdfs://<name-node:port>/tmp/checkpoint/625034b3-c6f1-4ab2-9524-e48dfde589c3
//It gives String so we can use org.apache.hadoop.fs to delete path
PySpark:
// Set directory
>>> spark.sparkContext.setCheckpointDir('hdfs:///tmp/checkpoint')
>>> t = sc._jsc.sc().getCheckpointDir().get()
>>> t
u'hdfs://<name-node:port>/tmp/checkpoint/dc99b595-f8fa-4a08-a109-23643e2325ca'
// notice 'u' at the start which means It returns unicode object
// Below are the steps to get hadoop file system object and delete
>>> fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True
>>> fs.delete(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True
这篇关于PySpark:全面清洁检查站的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!