如何有效地更新文件非常频繁修改的Impala表 [英] How to efficiently update Impala tables whose files are modified very frequently
问题描述
我们有一个基于Hadoop的解决方案(CDH 5.15),我们在其中的某些目录中获取HDFS中的新文件.在这些目录的顶部,我们有4-5个Impala(2.1)表.在HDFS中写入这些文件的过程是Spark结构化流式传输(2.3.1)
现在,一旦将文件写入HDFS,我们就会运行一些DDL查询:
-
ALTER TABLE table1恢复分区
,以检测添加到表中的新分区(及其HDFS目录和文件). -
刷新表1分区(partition1 = X,partition2 = Y)
,使用每个分区的所有键.
现在,此DDL花费的时间太长,并且它们正在我们的系统中排队,从而破坏了系统的数据可用性.
因此,我的问题是:是否有一种方法可以更有效地进行数据合并?
我们已经考虑:
-
使用
ALTER TABLE .. RECOVER PARTITONS
,但按照解决方案
由于似乎没有人可以解决我的问题,因此,我想分享一下我们使处理效率更高的方法,非常欢迎发表评论./p>
我们发现(文档尚不十分清楚),存储在HDFS的Spark检查点"中的某些信息是许多元数据文件,这些元数据文件描述了每个Parquet文件何时写入以及其大小:>
<代码> $ hdfs dfs -ls -h hdfs://...../my_spark_job/_spark_metadataw-r--r-- 3 hdfs 68K 2020-02-26 20:49 hdfs://...../my_spark_job/_spark_metadata/3248rw-r--r-- 3 hdfs 33.3M 2020-02-26 20:53 hdfs://...../my_spark_job/_spark_metadata/3249.compactw-r--r-- 3 hdfs 68K 2020-02-26 20:54 hdfs://...../my_spark_job/_spark_metadata/3250...$ hdfs dfs -cat hdfs://...../my_spark_job/_spark_metadata/3250v1{路径":"hdfs://.../my_spark_job/../part-00004.c000.snappy.parquet",大小":9866555,"isDir":false,修改时间":1582750862638,"blockReplication:3," blockSize:134217728," action:" add}{路径":"hdfs://.../my_spark_job/../part-00004.c001.snappy.parquet",大小":526513,"isDir":false,修改时间":1582750862834,"blockReplication:3," blockSize:134217728," action:" add}...
所以,我们要做的是:
- 构建一个轮询
_spark_metadata
文件夹的Spark Streaming Job.- 我们使用
fileStream
,因为它允许我们定义要使用的文件过滤器. - 该流中的每个条目都是这些JSON行之一,将对其进行解析以提取文件路径和大小.
- 我们使用
- 按文件所属的父文件夹(映射到每个Impala分区)将文件分组.
- 对于每个文件夹:
- 读取仅仅加载目标Parquet文件的数据框(以避免与其他作业写入文件的竞争情况)
- 计算要写入的块数(使用JSON中的size字段和目标块大小)
- 将数据帧压缩到所需的分区数量,并将其写回到HDFS
- 执行DDL
REFRESH TABLE myTable PARTITION([从新文件夹派生的分区键]
- 最后,删除源文件
我们取得的成就是:
-
通过对每个分区和批处理执行一次刷新来限制DDL.
-
通过配置批处理时间和块大小,我们可以使我们的产品适应具有更大或更小的数据集的不同部署方案.
-
该解决方案非常灵活,因为我们可以为Spark Streaming作业分配更多或更少的资源(执行程序,内核,内存等),也可以启动/停止(使用其自己的检查点系统).
-
我们还在研究在执行此过程的同时应用一些数据重新分区的方法,以使分区尽可能接近最佳大小.
We have a Hadoop-based solution (CDH 5.15) where we are getting new files in HDFS in some directories. On top os those directories we have 4-5 Impala (2.1) tables. The process writing those files in HDFS is Spark Structured Streaming (2.3.1)
Right now, we are running some DDL queries as soon as we get the files written to HDFS:
ALTER TABLE table1 RECOVER PARTITONS
to detect new partitions (and their HDFS directories and files) added to the table.REFRESH table1 PARTITIONS (partition1=X, partition2=Y)
, using all the keys for each partition.
Right now, this DDL is taking a bit too long and they are getting queued in our system, damaging the data availability of the system.
So, my question is: Is there a way to do this data incorporation more efficiently?
We have considered:
Using the
ALTER TABLE .. RECOVER PARTITONS
but as per the documentation, it only refreshes new partitions.Tried to use
REFRESH .. PARTITON ...
with multiple partitions at once, but the statement syntaxis does not allow to do that.Tried batching the queries but the Hive JDBC drives does not support batching queries.
Shall we try to do those updates in parallel given that the system is already busy?
- Any other way you are aware of?
Thanks!
Victor
Note: The way in which we know what partitions need refreshed is by using HDFS events as with Spark Structured Streaming we don´t know exactly when the files are written.
Note #2: Also, the files written in HDFS are sometimes small, so it would be great if it could be possible to merge those files at the same time.
Since nobody seems to have the answer for my problem, I would like to share the approach we took to make this processing more efficient, comments are very welcome.
We discovered (doc. is not very clear on this) that some of the information stored in the Spark "checkpoints" in HDFS is a number of metadata files describing when each Parquet file was written and how big was it:
$hdfs dfs -ls -h hdfs://...../my_spark_job/_spark_metadata
w-r--r-- 3 hdfs 68K 2020-02-26 20:49 hdfs://...../my_spark_job/_spark_metadata/3248
rw-r--r-- 3 hdfs 33.3M 2020-02-26 20:53 hdfs://...../my_spark_job/_spark_metadata/3249.compact
w-r--r-- 3 hdfs 68K 2020-02-26 20:54 hdfs://...../my_spark_job/_spark_metadata/3250
...
$hdfs dfs -cat hdfs://...../my_spark_job/_spark_metadata/3250
v1
{"path":"hdfs://.../my_spark_job/../part-00004.c000.snappy.parquet","size":9866555,"isDir":false,"modificationTime":1582750862638,"blockReplication":3,"blockSize":134217728,"action":"add"}
{"path":"hdfs://.../my_spark_job/../part-00004.c001.snappy.parquet","size":526513,"isDir":false,"modificationTime":1582750862834,"blockReplication":3,"blockSize":134217728,"action":"add"}
...
So, what we did was:
- Build a Spark Streaming Job polling that
_spark_metadata
folder.- We use a
fileStream
since it allow us to define the file filter to use. - Each entry in that stream is one of those JSON lines, which is parsed to extract the file path and size.
- We use a
- Group the files by the parent folder (which maps to each Impala partition) they belong to.
- For each folder:
- Read a dataframe loading only the targeted Parquet files (to avoid race conditions with the other job writing the files)
- Calculate how many blocks to write (using the size field in the JSON and a target block size)
- Coalesce the dataframe to the desired number of partitions and write it back to HDFS
- Execute the DDL
REFRESH TABLE myTable PARTITION ([partition keys derived from the new folder]
- Finally, delete the source files
What we achieved is:
Limit the DDLs, by doing one refresh per partition and batch.
By having batch time and block size configurable, we are able to adapt our product to different deployment scenarios with bigger or smaller datasets.
The solution is quite flexible, since we can assign more or less resources to the Spark Streaming job (executors, cores, memory, etc.) and also we can start/stop it (using its own checkpointing system).
We are also studying the possibily of applying some data repartitioning, while doing this process, to have partitions as close as possible to the most optimum size.
这篇关于如何有效地更新文件非常频繁修改的Impala表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!