镶木地板Pyspark中的UPSERT [英] UPSERT in parquet Pyspark

查看:161
本文介绍了镶木地板Pyspark中的UPSERT的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在s3中有实木复合地板文件,具有以下分区: 年/月/日/some_id 我想使用Spark(PySpark)在过去的 14天中每天进行一次UPSERT-我想替换s3中的现有数据(每个分区一个镶木文件),但不想删除14天之前的日子.. 我尝试了两种保存模式: 追加-不好,因为它只是添加了另一个文件. 覆盖-正在删除过去的数据和其他分区的数据.

I have parquet files in s3 with the following partitions: year / month / date / some_id Using Spark (PySpark), each day I would like to kind of UPSERT the last 14 days - I would like to replace the existing data in s3 (one parquet file for each partition), but not to delete the days that are before 14 days.. I tried two save modes: append - wasn't good because it just adds another file. overwrite - is deleting the past data and data for other partitions.

有什么方法或最佳实践可以克服吗?我应该在每次运行中从s3中读取所有数据,然后再次写回吗?也许重命名文件以便 append 替换s3中的当前文件?

Is there any way or best practice to overcome that? should I read all the data from s3 in each run, and write it back again? maybe renaming the files so that append will replace the current file in s3?

非常感谢!

推荐答案

我通常会做类似的事情.就我而言,我进行ETL并将一天的数据附加到镶木地板文件中:

I usually do something similar. In my case I do an ETL and append one day of data to a parquet file:

关键是要处理要写入的数据(在我的情况下为实际日期),请确保按date列进行分区,并覆盖当前日期的所有数据.

The key is to work with the data you want to write (in my case the actual date), make sure to partition by the date column and overwrite all data for the current date.

这将保留所有旧数据.例如:

This will preserve all old data. As an example:

(
    sdf
    .write
    .format("parquet")
    .mode("overwrite")
    .partitionBy("date")
    .option("replaceWhere", "2020-01-27")
    .save(uri)
)

您还可以查看 delta.io ,它是镶木地板的扩展格式,提供了一些有趣的功能,例如 ACID 交易.

Also you could take a look at delta.io which is an extension of the parquet format that gives some interesting features like ACID transactions.

这篇关于镶木地板Pyspark中的UPSERT的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆