使用 SNOWPIPE 时如何清除 Stage 或从 Stage 中删除文件? [英] How to PURGE Stage or delete files from Stage when using SNOWPIPE?

查看:65
本文介绍了使用 SNOWPIPE 时如何清除 Stage 或从 Stage 中删除文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Snowflake 提供了 Snowpipe 以在 Stage 中可用时将数据复制到表中,但它缺少清除选项.
还有其他方法可以实现吗?

解决方案

Snowpipe 没有直接实现 Purge 的方法,但可以通过结合 SnowpipeStreamTask

假设我们将要加载的数据文件驻留在 GCS 存储桶中

第 1 步:在带有外部舞台的雪花上创建雪管
请参阅此文档

//创建一个临时表创建表 SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE (COL1 STRING);//创建目标表创建表 SNOWPIPE_DB.PUBLIC.GCP_DESTINATION_TABLE (COL1 STRING);//创建一个外部舞台CREATE STAGE SNOWPIPE_DB.PUBLIC.GCP_STAGEURL='gcs://bucket/files/'STORAGE_INTEGRATION = '';//创建雪管创建管道 SNOWPIPE_DB.PUBLIC.GCP_PipeAUTO_INGEST = 真INTEGRATION = ''作为复制到 SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE从@SNOWPIPE_DB.PUBLIC.GCP_STAGE;

第 2 步:在表上创建流GCP_STAGE_TABLE

流记录对表所做的数据操作语言 (DML) 更改,包括有关插入、更新和删除的信息.
请参阅此文档

//在 APPEND_ONLY 模式下创建流,因为我们只关心 INSERTS创建或替换流 SNOWPIPE_DB.PUBLIC.RESPONSES_STREAMON TABLE SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLEAPPEND_ONLY = TRUE;

现在,每当一些数据上传到 GCS Bucket 时,GCP_STAGE_TABLE 就会由 Snowpipe 填充,我们的 Stream 也是如此RESPONSES_STREAM

RESPONSES_STREAM 看起来像这样

<头>
COL1元数据$ACTION元数据$ISUPDATE元数据$ROW_ID
穆罕默德插入错误kjee941e66d4ca4hhh1e2b8ddba12c9c905a829
土耳其插入错误b7c5uytba6c1jhhfb6e9d85e3d3cfd7249192b0d8

由于Stream有APPEND_ONLY模式,我们只会在METADATA$ACTION

中看到INSERT

第 3 步:创建一个过程来PURGE舞台并填充GCP_DESTINATION_TABLE

//创建一个过程创建或替换 Load_Data()返回 VARCHAR语言 JAVASCRIPT作为$$var purgeStage = `REMOVE @SNOWPIPE_DB.PUBLIC.GCP_STAGE`;var populateTable = `INSERT INTO SNOWPIPE_DB.PUBLIC.GCP_DESTINATION_TABLESELECT * FROM RESPONSES_STREAM`;尝试 {snowflake.execute ( {sqlText: purgeStage} );snowflake.execute ( {sqlText: populateTable} );返回成功".}抓住(错误){返回失败:"+ 错误;}$$

上述过程使用REMOVE命令清除舞台并填充表GCP_DESTINATION_TABLE.
从流 RESPONSES_STREAM 填充表 GCP_DESTINATION_TABLE 清除流.

步骤 4: 创建一个任务来调用过程 Load_Data()
请参阅此文档

我们以 5 分钟的间隔创建一个任务,它首先检查流 RESPONSES_STREAM 是否有加载到 GCP_STAGE_TABLE 的任何数据,如果为 True,则执行过程 Load_Data()

//任务 DDL创建或替换任务 MASTER_TASK仓库 = LOAD_WH时间表 = '5 分钟'WHEN SYSTEM$STREAM_HAS_DATA('SNOWPIPE_DB.PUBLIC.RESPONSES_STREAM')//检查数据流作为调用 Load_Data();

SYSTEM$STREAM_HAS_DATA('RESPONSES_STREAM') 在数据加载到 GCP_STAGE_TABLE 时评估为 True,然后使任务执行过程调用.

即使过程不是每 5 分钟调用一次,值得注意的是 WHEN SYSTEM$STREAM_HAS_DATA('RESPONSES_STREAM') 确实消耗一分钟的计算资源,为了减少这种情况,可以改变频率从 5 分钟到更长的持续时间.

为了使它成为一个 ELT 任务,Procedure 可以有一些转换逻辑,并且可以制作一个任务树.

注意:

  1. REMOVE 不受外部阶段的官方支持,但对我来说它仍然适用于 GCS Bucket.
  2. 让我知道它是否适用于 AWS S3 和 Azure.

Snowflake provides Snowpipe to Copy data into a Table as soon as it is available in a Stage, but it misses the option of Purge.
Is there another way to achieve this?

解决方案

There's no direct way to achieve Purge in case of Snowpipe but it can be achieved through the combination of Snowpipe, Stream and Task

Let's assume we have our Data files to be loaded residing in GCS Bucket

Step 1: Create a Snowpipe on Snowflake with an External stage
Refer to this Documentation

// Create a Staging Table
CREATE TABLE SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE (COL1 STRING);

// Create Destination Table
CREATE TABLE SNOWPIPE_DB.PUBLIC.GCP_DESTINATION_TABLE (COL1 STRING);

// Create an External Stage
CREATE STAGE SNOWPIPE_DB.PUBLIC.GCP_STAGE
  URL='gcs://bucket/files/'
  STORAGE_INTEGRATION = '<STORAGE_INTEGRATION>'; 

// Create Snowpipe
CREATE PIPE SNOWPIPE_DB.PUBLIC.GCP_Pipe
  AUTO_INGEST = true
  INTEGRATION = '<NOTIFICATION_INTEGRATION>'
  AS
  COPY INTO SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE
  FROM @SNOWPIPE_DB.PUBLIC.GCP_STAGE;

Step 2: Create Stream on Table GCP_STAGE_TABLE

A stream records data manipulation language (DML) changes made to a table, including information about inserts, updates, and deletes.
Refer to this Documentation

// Create Stream in APPEND_ONLY Mode since we are concerned with INSERTS only

CREATE OR REPLACE STREAM SNOWPIPE_DB.PUBLIC.RESPONSES_STREAM
  ON TABLE SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE
  APPEND_ONLY = TRUE;

Now, whenever some data is uploaded on the GCS Bucket, GCP_STAGE_TABLE is populated by Snowpipe and so is our Stream RESPONSES_STREAM

RESPONSES_STREAM Would look like this

COL1 METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
MOHAMMED INSERT FALSE kjee941e66d4ca4hhh1e2b8ddba12c9c905a829
TURKY INSERT FALSE b7c5uytba6c1jhhfb6e9d85e3d3cfd7249192b0d8

Since the Stream has APPEND_ONLY Mode, we will only see INSERT in METADATA$ACTION

Step 3: Create a Procedure to PURGE the stage and Populate GCP_DESTINATION_TABLE

// Create a Procedure
CREATE OR REPLACE Load_Data()
    RETURNS VARCHAR
    LANGUAGE JAVASCRIPT
    AS 
    $$
    var purgeStage = `REMOVE @SNOWPIPE_DB.PUBLIC.GCP_STAGE`; 
    var populateTable = `INSERT INTO SNOWPIPE_DB.PUBLIC.GCP_DESTINATION_TABLE
                         SELECT * FROM RESPONSES_STREAM`;
    try {
        snowflake.execute ( {sqlText: purgeStage} );
        snowflake.execute ( {sqlText: populateTable} );
        return "Succeeded."; 
        }
    catch (err)  {
        return "Failed: " + err; 
        }
    $$

The above Procedure uses REMOVE command to purge the Stage and Populates the Table GCP_DESTINATION_TABLE.
Populating the Table GCP_DESTINATION_TABLE from the Stream RESPONSES_STREAM Clears the Stream.

Step 4: Create a Task to call the Procedure Load_Data()
Refer to this Documentation

We create a Task with a 5 mins interval which 1st checks the Stream RESPONSES_STREAM for any Data loaded to GCP_STAGE_TABLE, if True, executes a Procedure Load_Data()

// Task DDL
CREATE OR REPLACE TASK MASTER_TASK
    WAREHOUSE  = LOAD_WH
    SCHEDULE = '5 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('SNOWPIPE_DB.PUBLIC.RESPONSES_STREAM') //Checks the stream for Data
AS 
    CALL Load_Data();

SYSTEM$STREAM_HAS_DATA('RESPONSES_STREAM') evaluates to True when Data is loaded to GCP_STAGE_TABLE, this then makes the Task execute the Procedure call.

Even though the Procedure is not called every 5 mins, it's worth noting that WHEN SYSTEM$STREAM_HAS_DATA('RESPONSES_STREAM') does consume a minute Compute resource, to reduce this, the frequency can be changed from 5 mins to a greater duration.

To make this an ELT task, Procedure can have some transformation logic and a Tree of Tasks can be made.

Note:

  1. REMOVE is not officially supported for External stages but it still worked for GCS Bucket for me.
  2. Let me know if it works for you in the case of AWS S3 and Azure.

这篇关于使用 SNOWPIPE 时如何清除 Stage 或从 Stage 中删除文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆