在雪花中重放未完成的雪花通知/消息 [英] Replaying outstanding snowpipe notifications/messages in Snowflake

查看:17
本文介绍了在雪花中重放未完成的雪花通知/消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

重新创建管道时,可能会丢失一些通知。有没有办法重播这些错过的通知?刷新管道是危险的(因此不是一种选择),因为重新创建管道时加载历史记录会丢失(因此可能会导致两次摄取相同的文件&;创建重复记录)

Snowflake记录了如何使用自动数据加载重新创建管道的过程(link)。遗憾的是,可能会错过步骤1(暂停管道)和步骤3(重新创建管道)之间传入的任何新通知。即使通过使用过程自动化该过程,我们也可以缩小窗口,但不能消除它。我已经通过多次测试证实了这一点。即使不暂停前一个管道,发生这种情况的可能性仍然很小。

但是,Snowflake知道这些通知,因为通知队列独立于管道(并为整个帐户共享)。但是在错误时间收到的通知永远不会被处理(我猜如果当时没有活动管道来处理它们,这是有意义的)。

我认为我们可以在管道状态的numOutstandingMessagesOnChannel属性中看到这些通知,但我找不到有关这方面的更多信息,也找不到如何处理这些通知。我想当管道更换时,它们可能会丢失。😞

注意:这与我在Snowflake(link)中重新创建管道时有关保留加载历史记录的另一个问题有关。

推荐答案

假设无法重放未完成的通知,我创建了一个过程来检测无法自动加载的文件。此方法的一个好处是,它还可以检测由于任何原因而无法加载的任何文件(不仅仅是丢失的通知)。

可以这样调用该过程:

CALL verify_pipe_load(
  'my_db.my_schema.my_pipe', -- Pipe name
  'my_db.my_schema.my_stage', -- Stage name
  'my_db.my_schema.my_table', -- Table name
  '/YYYY/MM/DD/HH/', -- File prefix
  'YYYY-MM-DD', -- Start time for the loads
  'ERROR' -- Mode
);

以下是它的高级工作原理:

  • 首先,它查找阶段中与指定前缀匹配的所有文件(使用LIST command),减去轻微延迟以说明延迟。
  • 然后,它会从这些文件中查找COPY_HISTORY中没有记录的所有文件。
  • 最后,它根据模式以三种方式之一处理丢失的文件加载:
    • 'ERROR'模式将通过引发异常中止该过程。这对于自动连续监视管道并确保不会丢失文件非常有用。只需将其与您选择的自动化工具连接起来!我们使用DBT+DBT Cloud
    • 'INGEST'模式将仅对那些特定文件使用REFRESH command自动将文件重新排队以供SnowPipe摄取。
    • 'RETURN'模式只返回响应中的文件列表。

以下是该过程的代码:

-- Returns a list of files missing from the destination table (separated by new lines).
-- Returns NULL if there are no missing files.
CREATE OR REPLACE PROCEDURE verify_pipe_load(
  -- The FQN of the pipe (used to auto ingest):
  PIPE_FQN STRING,
  -- Stage to get the files from (same as the pipe definition):
  STAGE_NAME STRING,
  -- Destination table FQN (same as the pipe definition):
  TABLE_FQN STRING,
  -- File prefix (to filter files):
  --  This should be based on a timestamp (ex: /YYYY/MM/DD/HH/)
  --  in order to restrict files to a specific time interval
  PREFIX STRING,
  -- The time to get the loaded files from (should match the prefix):
  START_TIME STRING,
  -- What to do with the missing files (if any):
  --  'RETURN': Return the list of missing files.
  --  'INGEST': Automatically ingest the missing files (and return the list).
  --  'ERROR': Make the procedure fail by throwing an exception.
  MODE STRING
)
  RETURNS STRING
  LANGUAGE JAVASCRIPT
  EXECUTE AS CALLER
AS
$$
  MODE = MODE.toUpperCase();
  if (!['RETURN', 'INGEST', 'ERROR'].includes(MODE)) {
    throw `Exception: Invalid mode '${MODE}'. Must be one of 'RETURN', 'INGEST' or 'ERROR'`;
  }

  let tableDB = TABLE_FQN.split('.')[0];
  let [pipeDB, pipeSchema, pipeName] = PIPE_FQN.split('.')
    .map(name => name.startsWith('"') && name.endsWith('"')
      ? name.slice(1, -1)
      : name.toUpperCase()
    );

  let listQueryId = snowflake.execute({sqlText: `
    LIST @${STAGE_NAME}${PREFIX};
  `}).getQueryId();

  let missingFiles = snowflake.execute({sqlText: `
    WITH staged_files AS (
      SELECT
        "name" AS name,
        TO_TIMESTAMP_NTZ(
          "last_modified",
          'DY, DD MON YYYY HH24:MI:SS GMT'
        ) AS last_modified,
        -- Add a minute per GB, to account for larger file size = longer ingest time
        ROUND("size" / 1024 / 1024 / 1024) AS ingest_delay,
        -- Estimate the time by which the ingest should be done (default 5 minutes)
        DATEADD(minute, 5 + ingest_delay, last_modified) AS ingest_done_ts
      FROM TABLE(RESULT_SCAN('${listQueryId}'))
      -- Ignore files that may not be done being ingested yet
      WHERE ingest_done_ts < CONVERT_TIMEZONE('UTC', CURRENT_TIMESTAMP())::TIMESTAMP_NTZ
    ), loaded_files AS (
      SELECT stage_location || file_name AS name
      FROM TABLE(
        ${tableDB}.information_schema.copy_history(
          table_name => '${TABLE_FQN}',
          start_time => '${START_TIME}'::TIMESTAMP_LTZ
        )
      )
      WHERE pipe_catalog_name = '${pipeDB}'
        AND pipe_schema_name = '${pipeSchema}'
        AND pipe_name = '${pipeName}'
    ), stage AS (
      SELECT DISTINCT stage_location
      FROM TABLE(
        ${tableDB}.information_schema.copy_history(
          table_name => '${TABLE_FQN}',
          start_time => '${START_TIME}'::TIMESTAMP_LTZ
        )
      )
      WHERE pipe_catalog_name = '${pipeDB}'
        AND pipe_schema_name = '${pipeSchema}'
        AND pipe_name = '${pipeName}'
    ), missing_files AS (
      SELECT REPLACE(name, stage_location) AS prefix
      FROM staged_files
      CROSS JOIN stage
      WHERE name NOT IN (
        SELECT name FROM loaded_files
      )
    )
    SELECT LISTAGG(prefix, '
') AS "missing_files"
    FROM missing_files;
  `});

  if (!missingFiles.next()) return null;
  missingFiles = missingFiles.getColumnValue('missing_files');
  if (missingFiles.length == 0) return null;

  if (MODE == 'ERROR') {
    throw `Exception: Found missing files:
'${missingFiles}'`;
  }

  if (MODE == 'INGEST') {
    missingFiles
      .split('
')
      .forEach(file => snowflake.execute({sqlText: `
        ALTER PIPE ${PIPE_FQN} REFRESH prefix='${file}';
      `}));
  }

  return missingFiles;
$$
;

这篇关于在雪花中重放未完成的雪花通知/消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆