在雪花中重放未完成的雪花通知/消息 [英] Replaying outstanding snowpipe notifications/messages in Snowflake
本文介绍了在雪花中重放未完成的雪花通知/消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
重新创建管道时,可能会丢失一些通知。有没有办法重播这些错过的通知?刷新管道是危险的(因此不是一种选择),因为重新创建管道时加载历史记录会丢失(因此可能会导致两次摄取相同的文件&;创建重复记录)
Snowflake记录了如何使用自动数据加载重新创建管道的过程(link)。遗憾的是,可能会错过步骤1(暂停管道)和步骤3(重新创建管道)之间传入的任何新通知。即使通过使用过程自动化该过程,我们也可以缩小窗口,但不能消除它。我已经通过多次测试证实了这一点。即使不暂停前一个管道,发生这种情况的可能性仍然很小。
但是,Snowflake知道这些通知,因为通知队列独立于管道(并为整个帐户共享)。但是在错误时间收到的通知永远不会被处理(我猜如果当时没有活动管道来处理它们,这是有意义的)。我认为我们可以在管道状态的numOutstandingMessagesOnChannel
属性中看到这些通知,但我找不到有关这方面的更多信息,也找不到如何处理这些通知。我想当管道更换时,它们可能会丢失。😞
注意:这与我在Snowflake(link)中重新创建管道时有关保留加载历史记录的另一个问题有关。
推荐答案
假设无法重放未完成的通知,我创建了一个过程来检测无法自动加载的文件。此方法的一个好处是,它还可以检测由于任何原因而无法加载的任何文件(不仅仅是丢失的通知)。
可以这样调用该过程:
CALL verify_pipe_load(
'my_db.my_schema.my_pipe', -- Pipe name
'my_db.my_schema.my_stage', -- Stage name
'my_db.my_schema.my_table', -- Table name
'/YYYY/MM/DD/HH/', -- File prefix
'YYYY-MM-DD', -- Start time for the loads
'ERROR' -- Mode
);
以下是它的高级工作原理:
- 首先,它查找阶段中与指定前缀匹配的所有文件(使用
LIST
command),减去轻微延迟以说明延迟。 - 然后,它会从这些文件中查找
COPY_HISTORY
中没有记录的所有文件。 - 最后,它根据模式以三种方式之一处理丢失的文件加载:
'ERROR'
模式将通过引发异常中止该过程。这对于自动连续监视管道并确保不会丢失文件非常有用。只需将其与您选择的自动化工具连接起来!我们使用DBT+DBT Cloud。'INGEST'
模式将仅对那些特定文件使用REFRESH
command自动将文件重新排队以供SnowPipe摄取。'RETURN'
模式只返回响应中的文件列表。
以下是该过程的代码:
-- Returns a list of files missing from the destination table (separated by new lines).
-- Returns NULL if there are no missing files.
CREATE OR REPLACE PROCEDURE verify_pipe_load(
-- The FQN of the pipe (used to auto ingest):
PIPE_FQN STRING,
-- Stage to get the files from (same as the pipe definition):
STAGE_NAME STRING,
-- Destination table FQN (same as the pipe definition):
TABLE_FQN STRING,
-- File prefix (to filter files):
-- This should be based on a timestamp (ex: /YYYY/MM/DD/HH/)
-- in order to restrict files to a specific time interval
PREFIX STRING,
-- The time to get the loaded files from (should match the prefix):
START_TIME STRING,
-- What to do with the missing files (if any):
-- 'RETURN': Return the list of missing files.
-- 'INGEST': Automatically ingest the missing files (and return the list).
-- 'ERROR': Make the procedure fail by throwing an exception.
MODE STRING
)
RETURNS STRING
LANGUAGE JAVASCRIPT
EXECUTE AS CALLER
AS
$$
MODE = MODE.toUpperCase();
if (!['RETURN', 'INGEST', 'ERROR'].includes(MODE)) {
throw `Exception: Invalid mode '${MODE}'. Must be one of 'RETURN', 'INGEST' or 'ERROR'`;
}
let tableDB = TABLE_FQN.split('.')[0];
let [pipeDB, pipeSchema, pipeName] = PIPE_FQN.split('.')
.map(name => name.startsWith('"') && name.endsWith('"')
? name.slice(1, -1)
: name.toUpperCase()
);
let listQueryId = snowflake.execute({sqlText: `
LIST @${STAGE_NAME}${PREFIX};
`}).getQueryId();
let missingFiles = snowflake.execute({sqlText: `
WITH staged_files AS (
SELECT
"name" AS name,
TO_TIMESTAMP_NTZ(
"last_modified",
'DY, DD MON YYYY HH24:MI:SS GMT'
) AS last_modified,
-- Add a minute per GB, to account for larger file size = longer ingest time
ROUND("size" / 1024 / 1024 / 1024) AS ingest_delay,
-- Estimate the time by which the ingest should be done (default 5 minutes)
DATEADD(minute, 5 + ingest_delay, last_modified) AS ingest_done_ts
FROM TABLE(RESULT_SCAN('${listQueryId}'))
-- Ignore files that may not be done being ingested yet
WHERE ingest_done_ts < CONVERT_TIMEZONE('UTC', CURRENT_TIMESTAMP())::TIMESTAMP_NTZ
), loaded_files AS (
SELECT stage_location || file_name AS name
FROM TABLE(
${tableDB}.information_schema.copy_history(
table_name => '${TABLE_FQN}',
start_time => '${START_TIME}'::TIMESTAMP_LTZ
)
)
WHERE pipe_catalog_name = '${pipeDB}'
AND pipe_schema_name = '${pipeSchema}'
AND pipe_name = '${pipeName}'
), stage AS (
SELECT DISTINCT stage_location
FROM TABLE(
${tableDB}.information_schema.copy_history(
table_name => '${TABLE_FQN}',
start_time => '${START_TIME}'::TIMESTAMP_LTZ
)
)
WHERE pipe_catalog_name = '${pipeDB}'
AND pipe_schema_name = '${pipeSchema}'
AND pipe_name = '${pipeName}'
), missing_files AS (
SELECT REPLACE(name, stage_location) AS prefix
FROM staged_files
CROSS JOIN stage
WHERE name NOT IN (
SELECT name FROM loaded_files
)
)
SELECT LISTAGG(prefix, '
') AS "missing_files"
FROM missing_files;
`});
if (!missingFiles.next()) return null;
missingFiles = missingFiles.getColumnValue('missing_files');
if (missingFiles.length == 0) return null;
if (MODE == 'ERROR') {
throw `Exception: Found missing files:
'${missingFiles}'`;
}
if (MODE == 'INGEST') {
missingFiles
.split('
')
.forEach(file => snowflake.execute({sqlText: `
ALTER PIPE ${PIPE_FQN} REFRESH prefix='${file}';
`}));
}
return missingFiles;
$$
;
这篇关于在雪花中重放未完成的雪花通知/消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文