如何有效地更新文件非常频繁修改的Impala表 [英] How to efficiently update Impala tables whose files are modified very frequently

查看:125
本文介绍了如何有效地更新文件非常频繁修改的Impala表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个基于Hadoop的解决方案(CDH 5.15),我们在其中的某些目录中获取HDFS中的新文件.在这些目录的顶部,我们有4-5个Impala(2.1)表.在HDFS中写入这些文件的过程是Spark结构化流式传输(2.3.1)

现在,一旦将文件写入HDFS,我们就会运行一些DDL查询:

  • ALTER TABLE table1恢复分区,以检测添加到表中的新分区(及其HDFS目录和文件).

  • 刷新表1分区(partition1 = X,partition2 = Y),使用每个分区的所有键.

现在,此DDL花费的时间太长,并且它们正在我们的系统中排队,从而破坏了系统的数据可用性.

因此,我的问题是:是否有一种方法可以更有效地进行数据合并?

我们已经考虑:

  • 使用 ALTER TABLE .. RECOVER PARTITONS ,但按照解决方案

由于似乎没有人可以解决我的问题,因此,我想分享一下我们使处理效率更高的方法,非常欢迎发表评论./p>

我们发现(文档尚不十分清楚),存储在HDFS的Spark检查点"中的某些信息是许多元数据文件,这些元数据文件描述了每个Parquet文件何时写入以及其大小:

 <代码> $ hdfs dfs -ls -h hdfs://...../my_spark_job/_spark_metadataw-r--r-- 3 hdfs 68K 2020-02-26 20:49 hdfs://...../my_spark_job/_spark_metadata/3248rw-r--r-- 3 hdfs 33.3M 2020-02-26 20:53 hdfs://...../my_spark_job/_spark_metadata/3249.compactw-r--r-- 3 hdfs 68K 2020-02-26 20:54 hdfs://...../my_spark_job/_spark_metadata/3250...$ hdfs dfs -cat hdfs://...../my_spark_job/_spark_metadata/3250v1{路径":"hdfs://.../my_spark_job/../part-00004.c000.snappy.parquet",大小":9866555,"isDir":false,修改时间":1582750862638,"blockReplication:3," blockSize:134217728," action:" add}{路径":"hdfs://.../my_spark_job/../part-00004.c001.snappy.parquet",大小":526513,"isDir":false,修改时间":1582750862834,"blockReplication:3," blockSize:134217728," action:" add}... 

所以,我们要做的是:

我们取得的成就是:

We have a Hadoop-based solution (CDH 5.15) where we are getting new files in HDFS in some directories. On top os those directories we have 4-5 Impala (2.1) tables. The process writing those files in HDFS is Spark Structured Streaming (2.3.1)

Right now, we are running some DDL queries as soon as we get the files written to HDFS:

  • ALTER TABLE table1 RECOVER PARTITONS to detect new partitions (and their HDFS directories and files) added to the table.

  • REFRESH table1 PARTITIONS (partition1=X, partition2=Y), using all the keys for each partition.

Right now, this DDL is taking a bit too long and they are getting queued in our system, damaging the data availability of the system.

So, my question is: Is there a way to do this data incorporation more efficiently?

We have considered:

  • Using the ALTER TABLE .. RECOVER PARTITONS but as per the documentation, it only refreshes new partitions.

  • Tried to use REFRESH .. PARTITON ... with multiple partitions at once, but the statement syntaxis does not allow to do that.

  • Tried batching the queries but the Hive JDBC drives does not support batching queries.

  • Shall we try to do those updates in parallel given that the system is already busy?

  • Any other way you are aware of?

Thanks!

Victor

Note: The way in which we know what partitions need refreshed is by using HDFS events as with Spark Structured Streaming we don´t know exactly when the files are written.

Note #2: Also, the files written in HDFS are sometimes small, so it would be great if it could be possible to merge those files at the same time.

解决方案

Since nobody seems to have the answer for my problem, I would like to share the approach we took to make this processing more efficient, comments are very welcome.

We discovered (doc. is not very clear on this) that some of the information stored in the Spark "checkpoints" in HDFS is a number of metadata files describing when each Parquet file was written and how big was it:

$hdfs dfs -ls -h hdfs://...../my_spark_job/_spark_metadata

w-r--r--   3 hdfs 68K   2020-02-26 20:49 hdfs://...../my_spark_job/_spark_metadata/3248
rw-r--r--  3 hdfs 33.3M 2020-02-26 20:53 hdfs://...../my_spark_job/_spark_metadata/3249.compact
w-r--r--   3 hdfs 68K   2020-02-26 20:54 hdfs://...../my_spark_job/_spark_metadata/3250
...

$hdfs dfs -cat hdfs://...../my_spark_job/_spark_metadata/3250
v1
{"path":"hdfs://.../my_spark_job/../part-00004.c000.snappy.parquet","size":9866555,"isDir":false,"modificationTime":1582750862638,"blockReplication":3,"blockSize":134217728,"action":"add"}
{"path":"hdfs://.../my_spark_job/../part-00004.c001.snappy.parquet","size":526513,"isDir":false,"modificationTime":1582750862834,"blockReplication":3,"blockSize":134217728,"action":"add"}
...

So, what we did was:

  • Build a Spark Streaming Job polling that _spark_metadata folder.
    • We use a fileStream since it allow us to define the file filter to use.
    • Each entry in that stream is one of those JSON lines, which is parsed to extract the file path and size.
  • Group the files by the parent folder (which maps to each Impala partition) they belong to.
  • For each folder:
    • Read a dataframe loading only the targeted Parquet files (to avoid race conditions with the other job writing the files)
    • Calculate how many blocks to write (using the size field in the JSON and a target block size)
    • Coalesce the dataframe to the desired number of partitions and write it back to HDFS
    • Execute the DDL REFRESH TABLE myTable PARTITION ([partition keys derived from the new folder]
  • Finally, delete the source files

What we achieved is:

  • Limit the DDLs, by doing one refresh per partition and batch.

  • By having batch time and block size configurable, we are able to adapt our product to different deployment scenarios with bigger or smaller datasets.

  • The solution is quite flexible, since we can assign more or less resources to the Spark Streaming job (executors, cores, memory, etc.) and also we can start/stop it (using its own checkpointing system).

  • We are also studying the possibily of applying some data repartitioning, while doing this process, to have partitions as close as possible to the most optimum size.

这篇关于如何有效地更新文件非常频繁修改的Impala表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆