是否可以删除底层镶木地板文件而不会对 DeltaLake _delta_log 产生负面影响 [英] Can underlying parquet files be deleted without negatively impacting DeltaLake _delta_log

查看:30
本文介绍了是否可以删除底层镶木地板文件而不会对 DeltaLake _delta_log 产生负面影响的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 DeltaLake 表上使用 .vacuum() 非常慢(参见 Delta Lake (OSS) 表在 EMR 和 S3 上 - 真空需要很长时间没有工作).

Using .vacuum() on a DeltaLake table is very slow (see Delta Lake (OSS) Table on EMR and S3 - Vacuum takes a long time with no jobs).

如果我手动删除了底层的 parquet 文件并且没有添加新的 json 日志文件或添加新的 .checkpoint.parquet 文件并更改 _delta_log/_last_checkpoint 指向它的文件;如果有的话,对 DeltaLake 表的负面影响是什么?

If I manually deleted the underlying parquet files and did not add a new json log file or add a new .checkpoint.parquet file and change the _delta_log/_last_checkpoint file that points to it; what would the negative impacts to the DeltaLake table be, if any?

显然,时间旅行,即加载依赖于我删除的镶木地板文件的先前版本的表格,将不起​​作用.我想知道的是,在读取、写入或附加到当前版本的 DeltaLake 表时会不会有任何问题?

Obviously time-traveling, i.e. loading a previous version of the table that relied on the parquet files I removed, would not work. What I want to know is, would there be any issues reading, writing, or appending to the current version of the DeltaLake table?

我想在 pySpark 中做什么:

What I am thinking of doing in pySpark:

### Assuming a working SparkSession as `spark`

from subprocess import check_output
import json
from pyspark.sql import functions as F

awscmd = "aws s3 cp s3://my_s3_bucket/delta/_delta_log/_last_checkpoint -"
last_checkpoint = str(json.loads(check_output(awscmd, shell=True).decode("utf-8")).get('version')).zfill(20)

s3_bucket_path = "s3a://my_s3_bucket/delta/"

df_chkpt_del = (
    spark.read.format("parquet")
    .load(f"{s3_bucket_path}/_delta_log/{last_checkpoint}.checkpoint.parquet")
    .where(F.col("remove").isNotNull())
    .select("remove.*")
    .withColumn("deletionTimestamp", F.from_unixtime(F.col("deletionTimestamp")/1000))
    .withColumn("delDateDiffDays", F.datediff(F.col("deletionTimestamp"), F.current_timestamp()))
    .where(F.col("delDateDiffDays") < -7 )
)

这里有很多选择.一种可能是:

There are a lot of options from here. One could be:

df_chkpt_del.select("path").toPandas().to_csv("files_to_delete.csv", index=False)

我可以在哪里将 files_to_delete.csv 读入 bash 数组,然后使用简单的 bash for 循环将每个镶木地板文件 s3 路径传递给 aws s3 rm 命令将文件一一删除.

Where I could read files_to_delete.csv into a bash array and then use a simple bash for loop passing each parquet file s3 path to an aws s3 rm command to remove the files one by one.

这可能比 vacuum() 慢,但至少它在工作时不会消耗集群资源.

This may be slower than vacuum(), but at least it will not be consuming cluster resources while it is working.

如果我这样做,我是否还必须:

If I do this, will I also have to either:

  1. 编写一个新的 _delta_log/000000000000000#####.json 文件来正确记录这些更改?
  2. 编写一个新的 000000000000000#####.checkpoint.parquet 文件来正确记录这些更改并将 _delta_log/_last_checkpoint 文件更改为指向该 checkpoint.parquet 文件?
  1. write a new _delta_log/000000000000000#####.json file that correctly documents these changes?
  2. write a new 000000000000000#####.checkpoint.parquet file that correctly documents these changes and change the _delta_log/_last_checkpoint file to point to that checkpoint.parquet file?

第二种选择会更容易.

但是,如果我只是删除文件而不更改_delta_log中的任何内容,如果不会产生负面影响,那么这将是最简单的.

However, if there will be no negative effects if I just remove the files and don't change anything in the _delta_log, then that would be the easiest.

推荐答案

TLDR.回答这个问题.

TLDR. Answering this question.

如果我手动删除了底层的 parquet 文件并且没有添加新的 json 日志文件或添加新的 .checkpoint.parquet 文件并更改指向它的 _delta_log/_last_checkpoint 文件;如果有的话,对 DeltaLake 表的负面影响是什么?

If I manually deleted the underlying parquet files and did not add a new json log file or add a new .checkpoint.parquet file and change the _delta_log/_last_checkpoint file that points to it; what would the negative impacts to the DeltaLake table be, if any?

是的,这可能会破坏您的增量表.

Yes, this could potentially corrupt your delta table.

让我简单回答一下 delta-lake 如何使用 _delta_log 读取版本.

Let me briefly answers how delta-lake reads a version using _delta_log.

如果你想读取版本 x 那么它将转到从 1x-1 的所有版本的增量日志,并将使要读取的镶木地板文件的运行总和.此过程的摘要在每 10 个版本后保存为 .checkpoint 以使此过程运行 sum 高效.

If you want to read version x then it will go to delta log of all versions from 1 to x-1 and will make a running sum of parquet files to read. Summary of this process is saved as a .checkpoint after every 10th version to make this process of running sum efficient.

假设,
版本 1 日志说,添加 add file_1、file_2、file_3版本 2 日志说,添加 delete file_1、file_2,并添加 file_4

Assume,
version 1 log says, add add file_1, file_2, file_3 version 2 log says, add delete file_1, file_2, and add file_4

所以当阅读第 2 版时,总指令将是<代码>添加文件_1、文件_2、文件_3 ->删除file_1、file_2,添加file_4

So when reading version no 2, total instruction will be add file_1, file_2, file_3 -> delete file_1, file_2, and add file_4

因此,读取的结果文件将是 file_3 和 file_4.

So, resultant files read will be file_3 and file_4.

在版本 3 中,您从文件系统中删除 file_4.如果您不使用 .vacuum,那么增量日志将不知道 file_4 不存在,它会尝试读取它并失败.

Say in version 3, you delete file_4 from file system. If you don't use .vacuum then delta log will not know that file_4 is not present, it will try to read it and will fail.

这篇关于是否可以删除底层镶木地板文件而不会对 DeltaLake _delta_log 产生负面影响的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆