将新数据附加到分区 parquet 文件 [英] Append new data to partitioned parquet files

查看:22
本文介绍了将新数据附加到分区 parquet 文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个 ETL 过程,我需要读取每小时的日志文件、对数据进行分区并保存它.我正在使用 Spark(在 Databricks 中).日志文件是 CSV,所以我阅读它们并应用模式,然后执行我的转换.

I am writing an ETL process where I will need to read hourly log files, partition the data, and save it. I am using Spark (in Databricks). The log files are CSV so I read them and apply a schema, then perform my transformations.

我的问题是,如何将每小时的数据保存为镶木地板格式但附加到现有数据集?保存时,我需要按数据框中存在的 4 列进行分区.

My problem is, how can I save each hour's data as a parquet format but append to the existing data set? When saving, I need to partition by 4 columns present in the dataframe.

这是我的存档:

data
    .filter(validPartnerIds($"partnerID"))
    .write
    .partitionBy("partnerID","year","month","day")
    .parquet(saveDestination)

问题是如果目标文件夹存在,则保存会引发错误.如果目标不存在,那么我不会附加我的文件.

The problem is that if the destination folder exists the save throws an error. If the destination doesn't exist then I am not appending my files.

我尝试过使用 .mode("append") 但我发现 Spark 有时会在中途失败,所以我最终会丢失已写入的数据量以及仍需要写入的数据量写.

I've tried using .mode("append") but I find that Spark sometimes fails midway through so I end up loosing how much of my data is written and how much I still need to write.

我正在使用镶木地板,因为分区大大增加了我将来的查询.同样,我必须将数据以某种文件格式写入磁盘,并且不能使用 Druid 或 Cassandra 等数据库.

I am using parquet because the partitioning substantially increases my querying in the future. As well, I must write the data as some file format on disk and cannot use a database such as Druid or Cassandra.

非常感谢任何有关如何对我的数据框进行分区和保存文件(坚持镶木地板或其他格式)的建议.

Any suggestions for how to partition my dataframe and save the files (either sticking to parquet or another format) is greatly appreciated.

推荐答案

如果需要追加文件,肯定要使用追加模式.我不知道你希望它生成多少个分区,但我发现如果你有 许多 个分区,partitionBy 会导致很多问题(内存和 IO- 类似的问题).

If you need to append the files, you definitely have to use the append mode. I don't know how many partitions you expect it to generate, but I find that if you have many partitions, partitionBy will cause a number of problems (memory- and IO-issues alike).

如果您认为您的问题是由于写入操作耗时过长,我建议您尝试以下两件事:

If you think that your problem is caused by write operations taking too long, I recommend that you try these two things:

1) 通过添加到配置来使用 snappy:

1) Use snappy by adding to the configuration:

conf.set("spark.sql.parquet.compression.codec", "snappy")

2) 在 SparkContext 上的 hadoopConfiguration 中禁用元数据文件的生成,如下所示:

2) Disable generation of the metadata files in the hadoopConfiguration on the SparkContext like this:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

生成元数据文件会有些耗时(请参阅 this博客文章),但根据 this它们实际上并不重要.就个人而言,我总是禁用它们并且没有问题.

The metadata-files will be somewhat time consuming to generate (see this blog post), but according to this they are not actually important. Personally, I always disable them and have no issues.

如果您生成许多分区 (> 500),恐怕我能做的最好的就是建议您使用附加模式查看解决方案 - 我根本没有设法让 partitionBy 处理那么多分区.

If you generate many partitions (> 500), I'm afraid the best I can do is suggest to you that you look into a solution not using append-mode - I simply never managed to get partitionBy to work with that many partitions.

这篇关于将新数据附加到分区 parquet 文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆