是否可以删除底层镶木地板文件而不会对 DeltaLake _delta_log 产生负面影响 [英] Can underlying parquet files be deleted without negatively impacting DeltaLake _delta_log
问题描述
在 DeltaLake 表上使用 .vacuum()
非常慢(参见 Delta Lake (OSS) 表在 EMR 和 S3 上 - 真空需要很长时间没有工作).
Using .vacuum()
on a DeltaLake table is very slow (see Delta Lake (OSS) Table on EMR and S3 - Vacuum takes a long time with no jobs).
如果我手动删除了底层的 parquet 文件并且没有添加新的 json
日志文件或添加新的 .checkpoint.parquet
文件并更改 _delta_log/_last_checkpoint
指向它的文件;如果有的话,对 DeltaLake 表的负面影响是什么?
If I manually deleted the underlying parquet files and did not add a new json
log file or add a new .checkpoint.parquet
file and change the _delta_log/_last_checkpoint
file that points to it; what would the negative impacts to the DeltaLake table be, if any?
显然,时间旅行,即加载依赖于我删除的镶木地板文件的先前版本的表格,将不起作用.我想知道的是,在读取、写入或附加到当前版本的 DeltaLake 表时会不会有任何问题?
Obviously time-traveling, i.e. loading a previous version of the table that relied on the parquet files I removed, would not work. What I want to know is, would there be any issues reading, writing, or appending to the current version of the DeltaLake table?
我想在 pySpark 中做什么:
What I am thinking of doing in pySpark:
### Assuming a working SparkSession as `spark`
from subprocess import check_output
import json
from pyspark.sql import functions as F
awscmd = "aws s3 cp s3://my_s3_bucket/delta/_delta_log/_last_checkpoint -"
last_checkpoint = str(json.loads(check_output(awscmd, shell=True).decode("utf-8")).get('version')).zfill(20)
s3_bucket_path = "s3a://my_s3_bucket/delta/"
df_chkpt_del = (
spark.read.format("parquet")
.load(f"{s3_bucket_path}/_delta_log/{last_checkpoint}.checkpoint.parquet")
.where(F.col("remove").isNotNull())
.select("remove.*")
.withColumn("deletionTimestamp", F.from_unixtime(F.col("deletionTimestamp")/1000))
.withColumn("delDateDiffDays", F.datediff(F.col("deletionTimestamp"), F.current_timestamp()))
.where(F.col("delDateDiffDays") < -7 )
)
这里有很多选择.一种可能是:
There are a lot of options from here. One could be:
df_chkpt_del.select("path").toPandas().to_csv("files_to_delete.csv", index=False)
我可以在哪里将 files_to_delete.csv
读入 bash 数组,然后使用简单的 bash for
循环将每个镶木地板文件 s3 路径传递给 aws s3 rm
命令将文件一一删除.
Where I could read files_to_delete.csv
into a bash array and then use a simple bash for
loop passing each parquet file s3 path to an aws s3 rm
command to remove the files one by one.
这可能比 vacuum()
慢,但至少它在工作时不会消耗集群资源.
This may be slower than vacuum()
, but at least it will not be consuming cluster resources while it is working.
如果我这样做,我是否还必须:
If I do this, will I also have to either:
- 编写一个新的
_delta_log/000000000000000#####.json
文件来正确记录这些更改? - 编写一个新的
000000000000000#####.checkpoint.parquet
文件来正确记录这些更改并将_delta_log/_last_checkpoint
文件更改为指向该checkpoint.parquet
文件?
- write a new
_delta_log/000000000000000#####.json
file that correctly documents these changes? - write a new
000000000000000#####.checkpoint.parquet
file that correctly documents these changes and change the_delta_log/_last_checkpoint
file to point to thatcheckpoint.parquet
file?
第二种选择会更容易.
但是,如果我只是删除文件而不更改_delta_log
中的任何内容,如果不会产生负面影响,那么这将是最简单的.
However, if there will be no negative effects if I just remove the files and don't change anything in the _delta_log
, then that would be the easiest.
推荐答案
TLDR.回答这个问题.
TLDR. Answering this question.
如果我手动删除了底层的 parquet 文件并且没有添加新的 json 日志文件或添加新的 .checkpoint.parquet 文件并更改指向它的 _delta_log/_last_checkpoint 文件;如果有的话,对 DeltaLake 表的负面影响是什么?
If I manually deleted the underlying parquet files and did not add a new json log file or add a new .checkpoint.parquet file and change the _delta_log/_last_checkpoint file that points to it; what would the negative impacts to the DeltaLake table be, if any?
是的,这可能会破坏您的增量表.
Yes, this could potentially corrupt your delta table.
让我简单回答一下 delta-lake 如何使用 _delta_log
读取版本.
Let me briefly answers how delta-lake reads a version using _delta_log
.
如果你想读取版本 x
那么它将转到从 1
到 x-1
的所有版本的增量日志,并将使要读取的镶木地板文件的运行总和.此过程的摘要在每 10 个版本后保存为 .checkpoint
以使此过程运行 sum 高效.
If you want to read version x
then it will go to delta log of all versions from 1
to x-1
and will make a running sum of parquet files to read. Summary of this process is saved as a .checkpoint
after every 10th version to make this process of running sum efficient.
假设,
版本 1 日志说,添加 add file_1、file_2、file_3
版本 2 日志说,添加 delete file_1、file_2,并添加 file_4
Assume,
version 1 log says, add add file_1, file_2, file_3
version 2 log says, add delete file_1, file_2, and add file_4
所以当阅读第 2 版时,总指令将是<代码>添加文件_1、文件_2、文件_3 ->删除file_1、file_2,添加file_4
So when reading version no 2, total instruction will be
add file_1, file_2, file_3 -> delete file_1, file_2, and add file_4
因此,读取的结果文件将是 file_3 和 file_4.
So, resultant files read will be file_3 and file_4.
在版本 3 中,您从文件系统中删除 file_4
.如果您不使用 .vacuum
,那么增量日志将不知道 file_4
不存在,它会尝试读取它并失败.
Say in version 3, you delete file_4
from file system. If you don't use .vacuum
then delta log will not know that file_4
is not present, it will try to read it and will fail.
这篇关于是否可以删除底层镶木地板文件而不会对 DeltaLake _delta_log 产生负面影响的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!