在大量分区上处理 upsert 不够快 [英] Processing upserts on a large number of partitions is not fast enough

查看:64
本文介绍了在大量分区上处理 upsert 不够快的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题

我们在 ADLS Gen2 之上有一个 Delta Lake 设置,其中包含下表:

We have a Delta Lake setup on top of ADLS Gen2 with the following tables:

  • bronze.DeviceData:按到达日期划分(Partition_Date)
  • silver.DeviceData:按事件日期和时间分区(Partition_DatePartition_Hour)
  • bronze.DeviceData: partitioned by arrival date (Partition_Date)
  • silver.DeviceData: partitioned by event date and hour (Partition_Date and Partition_Hour)

我们从事件中心摄取大量数据(每天超过 6 亿条记录)到 bronze.DeviceData(仅限追加).然后,我们以流方式处理新文件,并使用 delta MERGE 命令将它们更新插入 silver.DeviceData(见下文).

We ingest large amounts of data (>600M records per day) from an event hub into bronze.DeviceData (append-only). We then process the new files in a streaming fashion and upsert them into silver.DeviceData with the delta MERGE command (see below).

到达青铜表的数据可以包含来自任何银分区的数据(例如,设备可能会发送它在本地缓存的历史数据).但是,任何一天到达的 >90% 的数据来自分区 Partition_Date IN (CURRENT_DATE(), CURRENT_DATE() - INTERVAL 1 DAYS, CURRENT_DATE() + INTERVAL 1 DAYS).因此,为了更新数据,我们有以下两个 Spark 作业:

The data arriving in the bronze table can contain data from any partition in silver (e.g. a device may send historic data that it cached locally). However, >90% of the data arriving at any day is from partitions Partition_Date IN (CURRENT_DATE(), CURRENT_DATE() - INTERVAL 1 DAYS, CURRENT_DATE() + INTERVAL 1 DAYS). Therefore, to upsert the data, we have the following two spark jobs:

  • Fast":处理来自上述三个日期分区的数据.延迟在这里很重要,因此我们优先考虑这些数据
  • 慢":处理其余部分(除了这三个日期分区之外的任何内容).延迟并不那么重要,但它应该在一个合理"的范围内.时间(我会说不超过一周)

现在我们来解决问题:虽然数据量在慢"的情况下少了很多数量级.工作,它运行几天只是为了处理一天缓慢的青铜数据,有一个大集群.原因很简单:它必须读取和更新许多银分区(有时> 1000个日期分区),并且由于更新很小但日期分区可能是千兆字节,因此这些合并命令效率低下.

Now we come to the problem: although the amount of data is magnitudes less in the "slow" job, it runs for days just to process a single day of slow bronze data, with a big cluster. The reason is simple: it has to read and update many silver partitions (> 1000 date partitions at times), and since the updates are small but the date partitions can be gigabytes, these merge commands are inefficient.

而且,随着时间的推移,这个缓慢的工作会越来越慢,因为它接触到的银色分区会越来越大.

Furthermore, as time goes on, this slow job will become slower and slower, since the silver partitions it touches will grow.

问题

  1. 我们的分区方案和快速/慢速 Spark 作业设置通常是解决这个问题的好方法吗?
  2. 可以采取哪些措施来改进这种设置?我们希望降低缓慢作业的成本和延迟,并找到一种方法,使其随着每天到达的数据量以青铜级而不是银级表的大小而增长

其他信息

  • 我们需要 MERGE 命令,因为某些上游服务可以重新处理历史数据,然后也应该更新 Silver 表
  • 银桌的架构:
CREATE TABLE silver.DeviceData (
  DeviceID LONG NOT NULL, -- the ID of the device that sent the data
  DataType STRING NOT NULL, -- the type of data it sent
  Timestamp TIMESTAMP NOT NULL, -- the timestamp of the data point
  Value DOUBLE NOT NULL, -- the value that the device sent
  UpdatedTimestamp TIMESTAMP NOT NULL, -- the timestamp when the value arrived in bronze
  Partition_Date DATE NOT NULL, -- = TO_DATE(Timestamp)
  Partition_Hour INT NOT NULL -- = HOUR(Timestamp)
)
USING DELTA
PARTITIONED BY (Partition_Date, Partition_Hour)
LOCATION '...'

  • 我们的 MERGE 命令:
  • val silverTable = DeltaTable.forPath(spark, silverDeltaLakeDirectory)
    
    val batch = ... // the streaming update batch
    
    // the dates and hours that we want to upsert, for partition pruning
    // collected from the streaming update batch
    val dates = "..."
    val hours = "..."
    
    val mergeCondition = s"""
      silver.Partition_Date IN ($dates)
      AND silver.Partition_Hour IN ($hours)
      AND silver.Partition_Date = batch.Partition_Date
      AND silver.Partition_Hour = batch.Partition_Hour
      AND silver.DeviceID = batch.DeviceID
      AND silver.Timestamp = batch.Timestamp
      AND silver.DataType = batch.DataType
    """
    
    silverTable.alias("silver")
      .merge(batch.alias("batch"), mergeCondition)
      // only merge if the event is newer
      .whenMatched("batch.UpdatedTimestamp > silver.UpdatedTimestamp").updateAll
      .whenNotMatched.insertAll
      .execute
    

    推荐答案

    在 Databricks 上,有几种方法可以优化 merge into 操作的性能:

    On Databricks, there are several ways to optimize performance of the merge into operation:

    • 对属于连接条件的列执行使用 ZOrder 优化.这可能取决于特定的 DBR 版本,因为旧版本(7.6 IIRC 之前)使用真正的 ZOrder 算法,该算法适用于较少数量的列,而 DBR 7.6+ 默认使用希尔伯特空间填充曲线
    • 使用较小的文件大小 - 默认情况下,OPTIMIZE 创建需要重写的 1Gb 文件.您可以使用 spark.databricks.delta.optimize.maxFileSize 将文件大小设置为 32Mb-64Mb 范围,以便重写更少的数据
    • 在表的分区上使用条件(您已经在这样做了)
    • 不要使用自动压缩,因为它不能执行 ZOrder,而是使用 ZOrder 运行显式优化.有关详细信息,请参阅文档立>
    • 调整列的索引,因此它将仅索引您的条件和查询所需的列.它部分与合并有关,但可以稍微提高写入速度,因为不会为未用于查询的列收集统计信息.
    • Perform Optimize with ZOrder on the columns that are part of the join condition. This may depend on the specific DBR version, as older versions (prior to 7.6 IIRC) were using real ZOrder algorithm that is working well for smaller number of columns, while DBR 7.6+ uses by default Hilbert space-filling curves instead
    • Use smaller file sizes - by default, OPTIMIZE creates files of 1Gb, that need to be rewritten. You can use spark.databricks.delta.optimize.maxFileSize to set file size to 32Mb-64Mb range so it will rewrite less data
    • Use conditions on partitions of the table (you're already doing that)
    • Don't use auto-compaction because it can't do ZOrder, but instead run explicit optimize with ZOrder. See documentation on details
    • Tune indexing of the columns, so it will index only columns that are required for your condition and queries. It's partially related to the merging, but can slightly improve write speed because no statistics will be collected for columns that aren't used for queries.

    这个来自 Spark 峰会的演讲讨论了 的优化合并到 - 要观察的指标等

    This presentation from Spark Summit talks about optimization of the merge into - what metrics to watch, etc.

    我不是 100% 确定您需要条件 silver.Partition_Date IN ($dates) A​​ND silver.Partition_Hour IN ($hours) 因为如果您不这样做,您可能会读取比所需更多的数据'在传入的数据中没有特定的分区,但它需要查看执行计划.这篇知识库文章解释了如何确保mergeinto 使用分区修剪.

    I'm not 100% sure that you need condition silver.Partition_Date IN ($dates) AND silver.Partition_Hour IN ($hours) because you may read more data than required if you don't have specific partitions in the incoming data, but it will require to look into the execution plan. This knowledge base article explains how to make sure that merge into uses the partition pruning.

    这篇关于在大量分区上处理 upsert 不够快的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆