将新数据追加到分区的实木复合地板文件中 [英] Append new data to partitioned parquet files

查看:97
本文介绍了将新数据追加到分区的实木复合地板文件中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个ETL流程,在该流程中,我需要读取每小时的日志文件,对数据进行分区并保存。我正在使用Spark(在Databricks中)。
日志文件是CSV,因此我阅读了它们并应用了架构,然后执行了转换。

I am writing an ETL process where I will need to read hourly log files, partition the data, and save it. I am using Spark (in Databricks). The log files are CSV so I read them and apply a schema, then perform my transformations.

我的问题是,如何将每个小时的数据保存为拼花格式,然后追加到现有数据集中?保存时,我需要按数据帧中的4列进行分区。

My problem is, how can I save each hour's data as a parquet format but append to the existing data set? When saving, I need to partition by 4 columns present in the dataframe.

这是我的保存行:

data
    .filter(validPartnerIds($"partnerID"))
    .write
    .partitionBy("partnerID","year","month","day")
    .parquet(saveDestination)

问题是,如果目标文件夹存在,则保存会引发错误。
如果目的地不存在,那么我不会附加文件。

The problem is that if the destination folder exists the save throws an error. If the destination doesn't exist then I am not appending my files.

我尝试使用 .mode( append ),但我发现Spark有时会在中途失败,因此我最终失去了写入的数据量和仍然需要写入的数据量。

I've tried using .mode("append") but I find that Spark sometimes fails midway through so I end up loosing how much of my data is written and how much I still need to write.

我正在使用镶木地板,因为将来分区会大大增加我的查询量。同样,我必须将数据以某种文件格式写入磁盘,并且不能使用诸如Druid或Cassandra之类的数据库。

I am using parquet because the partitioning substantially increases my querying in the future. As well, I must write the data as some file format on disk and cannot use a database such as Druid or Cassandra.

有关如何对数据框进行分区和保存的任何建议文件(粘贴到镶木地板或其他格式)非常感谢。

Any suggestions for how to partition my dataframe and save the files (either sticking to parquet or another format) is greatly appreciated.

推荐答案

如果需要附加文件,则必须使用附加模式。我不知道您希望它生成多少个分区,但是我发现如果您有很多分区, partitionBy 会导致许多问题(类似于内存和IO问题)。

If you need to append the files, you definitely have to use the append mode. I don't know how many partitions you expect it to generate, but I find that if you have many partitions, partitionBy will cause a number of problems (memory- and IO-issues alike).

如果您认为问题是由于写操作花费的时间太长,则建议您尝试以下两种方法:

If you think that your problem is caused by write operations taking too long, I recommend that you try these two things:

1)在配置中使用snappy:

1) Use snappy by adding to the configuration:

conf.set("spark.sql.parquet.compression.codec", "snappy")

2)在 SparkContext hadoopConfiguration 中禁用生成元数据文件,如下所示:

2) Disable generation of the metadata files in the hadoopConfiguration on the SparkContext like this:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

生成元数据文件会比较耗时(请参阅此博客文章),但根据实际上并不重要。就个人而言,我总是禁用它们,没有任何问题。

The metadata-files will be somewhat time consuming to generate (see this blog post), but according to this they are not actually important. Personally, I always disable them and have no issues.

如果您生成许多分区(> 500),恐怕我能做的最好的就是建议您考虑使用解决方案使用附加模式-我只是从来没有设法让 partitionBy 可以使用那么多分区。

If you generate many partitions (> 500), I'm afraid the best I can do is suggest to you that you look into a solution not using append-mode - I simply never managed to get partitionBy to work with that many partitions.

这篇关于将新数据追加到分区的实木复合地板文件中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆