使用 SNOWPIPE 时如何清除 Stage 或从 Stage 中删除文件? [英] How to PURGE Stage or delete files from Stage when using SNOWPIPE?
问题描述
Snowflake 提供了 Snowpipe 以在 Stage 中可用时将数据复制到表中,但它缺少清除选项.
还有其他方法可以实现吗?
Snowpipe 没有直接实现 Purge 的方法,但可以通过结合 Snowpipe、Stream 和 Task
假设我们将要加载的数据文件驻留在 GCS 存储桶中
第 1 步:在带有外部舞台的雪花上创建雪管
请参阅此文档
//创建一个临时表创建表 SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE (COL1 STRING);//创建目标表创建表 SNOWPIPE_DB.PUBLIC.GCP_DESTINATION_TABLE (COL1 STRING);//创建一个外部舞台CREATE STAGE SNOWPIPE_DB.PUBLIC.GCP_STAGEURL='gcs://bucket/files/'STORAGE_INTEGRATION = '';//创建雪管创建管道 SNOWPIPE_DB.PUBLIC.GCP_PipeAUTO_INGEST = 真INTEGRATION = ''作为复制到 SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE从@SNOWPIPE_DB.PUBLIC.GCP_STAGE;
第 2 步:在表上创建流GCP_STAGE_TABLE
流记录对表所做的数据操作语言 (DML) 更改,包括有关插入、更新和删除的信息.
请参阅此文档
//在 APPEND_ONLY 模式下创建流,因为我们只关心 INSERTS创建或替换流 SNOWPIPE_DB.PUBLIC.RESPONSES_STREAMON TABLE SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLEAPPEND_ONLY = TRUE;
现在,每当一些数据上传到 GCS Bucket 时,GCP_STAGE_TABLE
就会由 Snowpipe 填充,我们的 Stream 也是如此RESPONSES_STREAM
RESPONSES_STREAM
看起来像这样
COL1 | 元数据$ACTION | 元数据$ISUPDATE | 元数据$ROW_ID |
---|---|---|---|
穆罕默德 | 插入 | 错误 | kjee941e66d4ca4hhh1e2b8ddba12c9c905a829 |
土耳其 | 插入 | 错误 | b7c5uytba6c1jhhfb6e9d85e3d3cfd7249192b0d8 |
由于Stream有APPEND_ONLY
模式,我们只会在METADATA$ACTION
INSERT
第 3 步:创建一个过程来PURGE
舞台并填充GCP_DESTINATION_TABLE
//创建一个过程创建或替换 Load_Data()返回 VARCHAR语言 JAVASCRIPT作为$$var purgeStage = `REMOVE @SNOWPIPE_DB.PUBLIC.GCP_STAGE`;var populateTable = `INSERT INTO SNOWPIPE_DB.PUBLIC.GCP_DESTINATION_TABLESELECT * FROM RESPONSES_STREAM`;尝试 {snowflake.execute ( {sqlText: purgeStage} );snowflake.execute ( {sqlText: populateTable} );返回成功".}抓住(错误){返回失败:"+ 错误;}$$
上述过程使用REMOVE
命令清除舞台并填充表GCP_DESTINATION_TABLE
.
从流 RESPONSES_STREAM
填充表 GCP_DESTINATION_TABLE
清除流.
步骤 4: 创建一个任务来调用过程 Load_Data()
请参阅此文档
我们以 5 分钟的间隔创建一个任务,它首先检查流 RESPONSES_STREAM
是否有加载到 GCP_STAGE_TABLE
的任何数据,如果为 True,则执行过程 Load_Data()
//任务 DDL创建或替换任务 MASTER_TASK仓库 = LOAD_WH时间表 = '5 分钟'WHEN SYSTEM$STREAM_HAS_DATA('SNOWPIPE_DB.PUBLIC.RESPONSES_STREAM')//检查数据流作为调用 Load_Data();
SYSTEM$STREAM_HAS_DATA('RESPONSES_STREAM')
在数据加载到 GCP_STAGE_TABLE
时评估为 True,然后使任务执行过程调用.
>
即使过程不是每 5 分钟调用一次,值得注意的是 WHEN SYSTEM$STREAM_HAS_DATA('RESPONSES_STREAM')
确实消耗一分钟的计算资源,为了减少这种情况,可以改变频率从 5 分钟到更长的持续时间.
为了使它成为一个 ELT 任务,Procedure 可以有一些转换逻辑,并且可以制作一个任务树.
注意:
REMOVE
不受外部阶段的官方支持,但对我来说它仍然适用于 GCS Bucket.- 让我知道它是否适用于 AWS S3 和 Azure.
Snowflake provides Snowpipe to Copy data into a Table as soon as it is available in a Stage, but it misses the option of Purge.
Is there another way to achieve this?
There's no direct way to achieve Purge in case of Snowpipe but it can be achieved through the combination of Snowpipe, Stream and Task
Let's assume we have our Data files to be loaded residing in GCS Bucket
Step 1: Create a Snowpipe on Snowflake with an External stage
Refer to this Documentation
// Create a Staging Table
CREATE TABLE SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE (COL1 STRING);
// Create Destination Table
CREATE TABLE SNOWPIPE_DB.PUBLIC.GCP_DESTINATION_TABLE (COL1 STRING);
// Create an External Stage
CREATE STAGE SNOWPIPE_DB.PUBLIC.GCP_STAGE
URL='gcs://bucket/files/'
STORAGE_INTEGRATION = '<STORAGE_INTEGRATION>';
// Create Snowpipe
CREATE PIPE SNOWPIPE_DB.PUBLIC.GCP_Pipe
AUTO_INGEST = true
INTEGRATION = '<NOTIFICATION_INTEGRATION>'
AS
COPY INTO SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE
FROM @SNOWPIPE_DB.PUBLIC.GCP_STAGE;
Step 2: Create Stream on Table GCP_STAGE_TABLE
A stream records data manipulation language (DML) changes made to a table, including information about inserts, updates, and deletes.
Refer to this Documentation
// Create Stream in APPEND_ONLY Mode since we are concerned with INSERTS only
CREATE OR REPLACE STREAM SNOWPIPE_DB.PUBLIC.RESPONSES_STREAM
ON TABLE SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE
APPEND_ONLY = TRUE;
Now, whenever some data is uploaded on the GCS Bucket, GCP_STAGE_TABLE
is populated by Snowpipe and so is our Stream RESPONSES_STREAM
RESPONSES_STREAM
Would look like this
COL1 | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
---|---|---|---|
MOHAMMED | INSERT | FALSE | kjee941e66d4ca4hhh1e2b8ddba12c9c905a829 |
TURKY | INSERT | FALSE | b7c5uytba6c1jhhfb6e9d85e3d3cfd7249192b0d8 |
Since the Stream has APPEND_ONLY
Mode, we will only see INSERT
in METADATA$ACTION
Step 3: Create a Procedure to PURGE
the stage and Populate GCP_DESTINATION_TABLE
// Create a Procedure
CREATE OR REPLACE Load_Data()
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
AS
$$
var purgeStage = `REMOVE @SNOWPIPE_DB.PUBLIC.GCP_STAGE`;
var populateTable = `INSERT INTO SNOWPIPE_DB.PUBLIC.GCP_DESTINATION_TABLE
SELECT * FROM RESPONSES_STREAM`;
try {
snowflake.execute ( {sqlText: purgeStage} );
snowflake.execute ( {sqlText: populateTable} );
return "Succeeded.";
}
catch (err) {
return "Failed: " + err;
}
$$
The above Procedure uses REMOVE
command to purge the Stage and Populates the Table GCP_DESTINATION_TABLE
.
Populating the Table GCP_DESTINATION_TABLE
from the Stream RESPONSES_STREAM
Clears the Stream.
Step 4: Create a Task to call the Procedure Load_Data()
Refer to this Documentation
We create a Task with a 5 mins interval which 1st checks the Stream RESPONSES_STREAM
for any Data loaded to GCP_STAGE_TABLE
, if True, executes a Procedure Load_Data()
// Task DDL
CREATE OR REPLACE TASK MASTER_TASK
WAREHOUSE = LOAD_WH
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('SNOWPIPE_DB.PUBLIC.RESPONSES_STREAM') //Checks the stream for Data
AS
CALL Load_Data();
SYSTEM$STREAM_HAS_DATA('RESPONSES_STREAM')
evaluates to True when Data is loaded to GCP_STAGE_TABLE
, this then makes the Task execute the Procedure call.
Even though the Procedure is not called every 5 mins, it's worth noting that WHEN SYSTEM$STREAM_HAS_DATA('RESPONSES_STREAM')
does consume a minute Compute resource, to reduce this, the frequency can be changed from 5 mins to a greater duration.
To make this an ELT task, Procedure can have some transformation logic and a Tree of Tasks can be made.
Note:
REMOVE
is not officially supported for External stages but it still worked for GCS Bucket for me.- Let me know if it works for you in the case of AWS S3 and Azure.
这篇关于使用 SNOWPIPE 时如何清除 Stage 或从 Stage 中删除文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!