BigQuery - 时间序列和选择“最新"记录的最有效方法 [英] BigQuery - Time Series and most efficient way to select the 'latest' record

查看:24
本文介绍了BigQuery - 时间序列和选择“最新"记录的最有效方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我们的 BQ 设计中,我们有一个客户表(嵌套的原始数据),它是源自我们微服务层的事件(消耗 kinesis 流),其中每个事件都有该事件所针对的实体的最新实体快照(后处理更改后的图像).我猜是一种现代变化数据捕获.

In our BQ design we have a customer table (nested raw data) which is event sourced from our microservices layer (consuming of kinesis steams), where each event has the latest entity snapshot for the entity the event is for (post processing image after the change). A sort of modern change data capture I guess.

每个事件中的最新快照是我们填充 BigQuery 的方式 - 它以 APPEND ONLY 模式被提取并加载到 biqQuery(通过 apache spark 结构化流连接器).(这与对给定 ID 更改和更新一行不同)

This latest snapshot in each event is how we populate BigQuery - it is extracted and loaded into biqQuery (via apache spark structured steaming connector) in an APPEND ONLY mode. (This is different to say mutating and updating one row for a given ID)

因此,鉴于这只是追加,该表的大小当然会随着时间的推移而增长 - 事件中每次更改的条目.然而,它很好地是一个完整的客户状态和变化的时间序列(我们的要求),并且是不可变的.我们可以通过重放事件来重建完整的仓库,例如......在上下文中足够了.

So given this is append only, the size of this table can grow of course over time - an entry per change from an event. However it quite nicely is a full, timeseries of customer state and changes (our requirement to have), and is immutable as such. We can rebuild the full warehouse by replaying the events for example....enough on the context.

这样做的一个后果是加载到 BigQuery 可能会导致重复(例如,如果触发错误并重试微批处理,则 BQ 在按作业加载时不是幂等的结构化流接收器,或者只是由于分布式性质,它通常是可能的).SteamingInserts 可能是以后需要研究的东西,因为它有助于重复数据删除.....

One consequence of this is the fact that loading into BigQuery may result in duplicates (e.g if spark error and retries a micro batch, BQ isnt an idempotent structured streaming sink when loading by jobs, or just due to distributed nature its generally possible). SteamingInserts might be something to look into later as it helps with deduping.....

这种架构的结果是我需要在原始时间序列数据(记住偶尔会有重复)之上的视图,该视图在这些条件下返回最新记录.

The result of this architecture is that I need a view ontop of the raw time series data (remember can occasionally have duplicates) that returns the LATEST record under these conditions.

最新由客户记录 (metadata.lastUpdated) 上的元数据结构字段确定 - 具有 MAX(metadata.lastUpdated) 的行是最新的.这是由我们的 MS 层保证的.

Latest is determined by a metadata struct field on the customer record (metadata.lastUpdated) - and the row with the MAX(metadata.lastUpdated) is the latest. This is guarneteed by our MS layer.

这也是一个真实的事件时间时间戳.表 id DAY 分区并有一个 _PARTITIONTIME 列,但这只是一个摄取时间,我不能使用它.当我可以指定要用作分区时间的列时会很棒!(愿望清单).

This is a true event time timestamp as well. The table id DAY partitioned and has a _PARTITIONTIME column, but this is just an ingest time and I cant use this. Be great when I can specify a column to be used as the partition time! (wishlist).

重复将是具有相同客户id"和metadata.lastUpdated"的两行 - 所以 MAX(metadata.lastUpdated) 可以返回 2 行,所以我需要使用

A duplicate will be two rows with the SAME customer 'id' AND 'metadata.lastUpdated' - so MAX(metadata.lastUpdated) could return 2 rows, so I need to use

ROW_NUMBER() OVER (PARTITION BY .... 所以我可以选择 rowNum=1

ROW_NUMBER() OVER (PARTITION BY .... so I can select the rowNum=1

在我看来,也只选择有重复的 1 行.

In my view as well to only select 1 row where there is dups.

好的,足够的词/上下文(抱歉),下面是我查看 SQL 以获取最新信息.它在我的测试中有效,但是当表的大小/行数变大时,我不确定这是实现我的结果的最有效方法,并且想知道那里是否有任何 BigQuery boffins 可能有更高效/更聪明的方法SQL要做到这一点?为什么 SQL 可以,但绝对不是性能调优方面的专家,尤其是为 BQ 性能调优执行 SQL 的最佳方法.

Ok so enough words/context (sorry), below is my view SQL to get the latest. It works from my tests, but I am not sure it is the most efficient way to achieve my outcome when the size of the table / number of rows gets large, and was wondering if any BigQuery boffins out there might have a more efficient / clever SQL to do this? Why SQL is OK, but by no means an expert in performance tuning for sure and in particular the best ways to do SQL for BQ perf tunning.

我只是希望能够将所有数据放在一个表中,并依靠 dremel 引擎的强大功能来查询它,而不是需要有多个表或做任何太复杂的事情.

I was just hoping to be able to have all the data in one table and rely on the power of the dremel engine to just query it, rather then needing to have multiple tables or do anyting too complex.

所以我的 SQL 在下面.注意 - 我的时间戳是作为字符串摄取的,所以也需要在视图中解析它.

SO my SQL is below. Note - my timestamp is ingested as a string, so need to PARSE this in the view too.

WITH
  cus_latest_watermark AS (
  SELECT
    id,
    MAX(PARSE_TIMESTAMP("%Y-%m-%dT%H:%M:%E*S%Ez", metadata.lastUpdated)) AS maxLastUpdatedTimestampUTC
  FROM
    `project.dataset.customer_refdata`
  GROUP BY
    id ),
  cust_latest_rec_dup AS (
  SELECT
    cus.*,
    ROW_NUMBER() OVER (PARTITION BY cus.id ORDER BY cus.id) AS rowNum
  FROM
    `project.dataset.customer_refdata` cus
  JOIN
    cus_latest_watermark
  ON
    cus.id = cus_latest_watermark.id
    AND PARSE_TIMESTAMP("%Y-%m-%dT%H:%M:%E*S%Ez", cus.metadata.lastUpdated) = cus_latest_watermark.maxLastUpdatedTimestampUTC)
SELECT
  cust_latest_rec_dup.* EXCEPT(rowNum)
FROM
  cust_latest_rec_dup
WHERE
  rowNum = 1

谢谢!

推荐答案

试试下面的 BigQuery Standard SQL

Try below for BigQuery Standard SQL

#standardSQL
WITH cus_watermark AS (
  SELECT
    *,
    PARSE_TIMESTAMP("%Y-%m-%dT%H:%M:%E*S%Ez", metadata.lastUpdated) AS UpdatedTimestampUTC
  FROM `project.dataset.customer_refdata`
),
cust_latest_rec_dup AS (
  SELECT 
    *,
    ROW_NUMBER() OVER (PARTITION BY id ORDER BY UpdatedTimestampUTC DESC) AS rowNum
  FROM cus_watermark
)
SELECT * EXCEPT(rowNum)
FROM cust_latest_rec_dup
WHERE rowNum = 1  

您可以使用以下虚拟数据播放/测试此方法

You can play/test this approach with below dummy data

#standardSQL
WITH `project.dataset.customer_refdata` AS (
  SELECT 1 AS id, '2017-07-14 16:47:27' AS lastUpdated UNION ALL
  SELECT 1, '2017-07-14 16:47:27' UNION ALL
  SELECT 1, '2017-07-14 17:47:27' UNION ALL
  SELECT 1, '2017-07-14 18:47:27' UNION ALL
  SELECT 2, '2017-07-14 16:57:27' UNION ALL
  SELECT 2, '2017-07-14 17:57:27' UNION ALL
  SELECT 2, '2017-07-14 18:57:27' 
),
cus_watermark AS (
  SELECT
    *,
    PARSE_TIMESTAMP("%Y-%m-%d %T", lastUpdated) AS UpdatedTimestampUTC
  FROM `project.dataset.customer_refdata`
),
cust_latest_rec_dup AS (
  SELECT 
    *,
    ROW_NUMBER() OVER (PARTITION BY id ORDER BY UpdatedTimestampUTC DESC) AS rowNum
  FROM cus_watermark
)
SELECT * EXCEPT(rowNum)
FROM cust_latest_rec_dup
WHERE rowNum = 1

这篇关于BigQuery - 时间序列和选择“最新"记录的最有效方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆