如何更快地计算Foundry“最新版本"数据集? [英] How do I compute my Foundry 'latest version' dataset faster?

查看:65
本文介绍了如何更快地计算Foundry“最新版本"数据集?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据集,该数据集吸收对数据行的最新编辑,但仅吸收最近编辑的版本.(即,它在 update_ts 时间戳列上是递增的).

I have a dataset ingesting the latest edits to rows of my data, but it only ingests the recently edited version. (i.e. it's incremental on an update_ts timestamp column).

原始表:

| primary_key | update_ts |
|-------------|-----------|
| key_1       | 0         |
| key_2       | 0         |
| key_3       | 0         |

表更新后:

| primary_key | update_ts |
|-------------|-----------|
| key_1       | 0         |
| key_2       | 0         |
| key_3       | 0         |
| key_1       | 1         |
| key_2       | 1         |
| key_1       | 2         |

提取后,我需要为所有以前的更新计算最新版本",同时还要考虑到任何新的编辑.

After ingestion, I need to compute the 'latest version' for all prior updates while also taking into account any new edits.

这意味着我每次都要进行增量摄取并运行SNAPSHOT输出.这对于我的构建来说非常慢,因为我注意到每次想为数据计算最新版本时,都必须查看所有输出行.

This means I am taking the incremental ingest and running a SNAPSHOT output each time. This is very slow for my build as I've noticed I have to look over all my output rows every time I want to compute the latest version for my data.

交易n = 1(快照):

Transaction n=1 (SNAPSHOT):

| primary_key | update_ts |
|-------------|-----------|
| key_1       | 0         |
| key_2       | 0         |
| key_3       | 0         |

交易n = 2(APPEND):

Transaction n=2 (APPEND):

| primary_key | update_ts |
|-------------|-----------|
| key_1       | 1         |
| key_2       | 1         |

如何使最新版本"的计算速度更快?

How can I make this 'latest version' computation faster?

推荐答案

这是一种常见的模式,将受益于

This is a common pattern that will benefit from bucketing.

要点是:根据您的 primary_key 列将输出SNAPSHOT写入存储桶中,其中,将整个更大的输出改组的昂贵步骤完全是跳过.

The gist of this is: write your output SNAPSHOT into buckets based on your primary_key column, where the expensive step of shuffling your much much larger output is skipped entirely.

这意味着您只需要将新数据交换到已经包含先前历史记录的存储桶中即可.

This means you will only have to exchange your new data to the buckets that already contain your prior history.

让我们从初始状态开始,在该状态下,我们运行的是先前计算的最新"版本,该版本运行速度很慢:

Let's start from the initial state, where we are running on a prior-computed 'latest' version that was a slow SNAPSHOT:

- output: raw_dataset
  input: external_jdbc_system
  hive_partitioning: none
  bucketing: none
  transactions:
    - SNAPSHOT
    - APPEND
    - APPEND
- output: clean_dataset
  input: raw_dataset
  hive_partitioning: none
  bucketing: none
  transactions:
    - SNAPSHOT
    - SNAPSHOT
    - SNAPSHOT

如果我们使用存储在 primary_key 列上的存储桶中写出 clean_dataset 到单独计算的存储桶数中,以适应我们期望的数据规模,我们将需要以下代码:

If we write out clean_dataset using bucketing over the primary_key column into a count of buckets calculated separately to fit the datascale we anticipate, we would need the following code:

from transforms.api import transform, Input, Output
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window


@transform(
    my_output=Output("/datasets/clean_dataset"),
    my_input=Input("/datasets/raw_dataset")
)
def my_compute_function(my_input, my_output):

    BUCKET_COUNT = 600
    PRIMARY_KEY = "primary_key"
    ORDER_COL = "update_ts"

    updated_keys = my_input.dataframe("added")
    last_written = my_output.dataframe("current")

    updated_keys.repartition(BUCKET_COUNT, PRIMARY_KEY)

    value_cols = [x for x in last_written.columns if x != PRIMARY_KEY]

    updated_keys = updated_keys.select(
      PRIMARY_KEY,
      *[F.col(x).alias("updated_keys_" + x) for x in value_cols]
    )

    last_written = last_written.select(
      PRIMARY_KEY,
      *[F.col(x).alias("last_written_" + x) for x in value_cols]
    )

    all_rows = updated_keys.join(last_written, PRIMARY_KEY, "fullouter")
    
    latest_df = all_rows.select(
      PRIMARY_KEY,
      *[F.coalesce(
          F.col("updated_keys_" + x),
          F.col("last_written_" + x)
        ).alias(x) for x in value_cols]
    )

    my_output.set_mode("replace")

    return my_output.write_dataframe(
        latest_df,
        bucket_cols=PRIMARY_KEY,
        bucket_count=BUCKET_COUNT,
        sort_by=ORDER_COL
    )

运行此命令时,您会在查询计划中注意到该项目移至输出上不再包含交换,这意味着它将不会对数据进行改组.现在,您将看到的唯一交换是在 input 上,它需要以与格式化输出完全相同的方式分发更改(这是非常快的操作).

When this runs, you'll notice in your query plan that the project step over the output no longer includes an exchange, which means it won't be shuffling that data. The only exchange you'll now see is on the input where it needs to distribute the changes in the exact same manner as the output was formatted (this is a very fast operation).

然后,此交换保留在 fullouter 连接步骤中,然后该连接将利用此交换并快速运行600个任务.最后,我们通过在以前相同的列上显式存储到相同数量的存储桶中,来维护输出的格式.

This exchange is then preserved into the fullouter join step, where the join will then exploit this and run the 600 tasks very quickly. Finally, we maintain the format on the output by explicitly bucketing into the same number of buckets over the same columns as before.

注意:通过这种方法,每个存储桶中的文件大小会随着时间的推移而增长,而无需考虑增加存储桶数以保持文件大小的需要.最终,您将通过这种技术达到一个阈值,即文件大小超过128MB,并且您将不再有效执行(解决方案是提高 BUCKET_COUNT 值).

NOTE: with this approach, your file sizes in each bucket will grow over time and not take into account the need to increase bucket counts to keep things nicely sized. You will eventually hit a threshold with this technique where file sizes get above 128MB and you are no longer executing efficiently (the fix is to bump the BUCKET_COUNT value).

您的输出现在将如下所示:

Your output will now look like the following:

- output: raw_dataset
  input: external_jdbc_system
  hive_partitioning: none
  bucketing: none
  transactions:
    - SNAPSHOT
    - APPEND
    - APPEND
- output: clean_dataset
  input: raw_dataset
  hive_partitioning: none
  bucketing: BUCKET_COUNT by PRIMARY_KEY
  transactions:
    - SNAPSHOT
    - SNAPSHOT
    - SNAPSHOT

这篇关于如何更快地计算Foundry“最新版本"数据集?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆