我如何在加载它们时自动推断 S3 上的 CSV 文件的架构? [英] How can I automatically infer schemas of CSV files on S3 as I load them?

查看:22
本文介绍了我如何在加载它们时自动推断 S3 上的 CSV 文件的架构?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

背景

目前我使用 Snowflake 作为数据仓库,使用 AWS 的 S3 作为数据湖.S3 上的大多数文件都是 Parquet 格式.对于这些,我使用了 Snowflake 的一项新的有限功能(记录在 此处)从 S3 上的镶木地板文件中自动检测架构,我可以使用它来生成具有正确列名和推断数据类型的 CREATE TABLE 语句.此功能目前仅适用于 Apache Parquet、Avro 和 ORC 文件.我想找到一种方法来实现相同的预期目标,但适用于 CSV 文件.

Currently I am using Snowflake as a Data Warehouse and AWS' S3 as a data lake. The majority of the files that land on S3 are in the Parquet format. For these, I am using a new limited feature by Snowflake (documented here) that automatically detects the schema from the parquet files on S3, which I can use to generate a CREATE TABLE statement with the correct column names and inferred data types. This feature currently only works for Apache Parquet, Avro, and ORC files. I would like to find a way that achieves the same desired objective but for CSV files.

我尝试做的事情

这是我目前推断 Parquet 文件架构的方式:

This is how I currently infer the schema for Parquet files:

select generate_column_description(array_agg(object_construct(*)), 'table') as columns 
from table (infer_schema(location=>'${LOCATION}', file_format=>'${FILE_FORMAT}'))

但是,如果我尝试将 FILE_FORMAT 指定为 csv,该方法将失败.

However if I try specifying the FILE_FORMAT as csv that approach will fail.

我考虑过的其他方法:

  1. 将所有登陆 S3 的文件转移到 Parquet(这涉及更多代码和基础设施设置,所以不是我的首选,尤其是我想在 s3 上保留一些文件的自然类型)
  2. 有一个脚本(例如使用 Python 中的 Pandas 之类的库)来推断 S3 中文件的架构(这也涉及更多代码,并且在雪花处理镶木地板文件而不是非镶木地板文件的意义上会很奇怪由 aws 上的一些脚本处理).
  3. 使用雪花 UDF 来推断架构.还没有完全考虑我在那里的选择.

期望的行为

当一个新的 csv 文件登陆 S3(在预先存在的 STAGE 上)时,我想推断模式,并能够使用推断的数据类型生成一个 CREATE TABLE 语句.最好,我想在 Snowflake 中这样做,因为那里存在现有的上述模式推理解决方案.如果需要,很乐意添加更多信息.

As a new csv file lands on S3 (on a pre-existing STAGE), I would like to infer the schema, and be able to generate a CREATE TABLE statement with the inferred data types. Preferably, I would like to do that within Snowflake as the existing aforementioned schema-inference solution exists there. Happy to add further information if needed.

推荐答案

我写了一个存储过程来帮助解决这个问题;然而,它的唯一目标是推断无类型列的数据类型.它的工作原理如下:

I wrote a stored procedure to assist with this; however, its only goal is to infer the data types of untyped columns. It works as follows:

  1. 将 CSV 加载到一个所有列都定义为 varchars 的表中.
  2. 通过对新表的查询调用 SP(重点是仅获取您想要的列并限制行数以保持合理的类型推断时间).
  3. 在 SP 调用中还有用于旧位置和新位置的数据库、架构和表 - 旧的所有 varchar 和新的推断类型.

然后 SP 将推断数据类型并创建两个 SQL 语句.一个语句将使用推断的数据类型创建新表.一个语句将使用适当的包装器(例如 try_multi_timestamp(),一种扩展 try_to_timestamp() 以尝试各种常见格式的 UDF)从无类型(全部为 varchar)表复制到新表.

The SP will then infer the data types and create two SQL statements. One statement will create the new table with the inferred data types. One statement will copy from the untyped (all varchar) table to the new table with appropriate wrappers such as try_multi_timestamp(), a UDF that extends try_to_timestamp() to try various common formats.

我打算扩展它,以便它根本不需要无类型(所有 varchar)表,但还没有解决它.既然它出现在这里,我可能会回过头来更新具有该功能的 SP.您可以指定直接从舞台读取的查询,但您必须使用 $1、$2... 和列名的别名(否则 DDL 将尝试创建像 $1 这样的列名).如果查询直接针对某个阶段运行,对于旧的数据库、架构和表,您可以输入任何内容,因为它仅用于从 select 语句生成插入.

I meant to extend this so that it didn't require the untyped (all varchar) table at all, but haven't gotten around to it. Since it's come up here, I may circle back and update the SP with that capability. You can specify a query that reads directly from the stage, but you'd have to use $1, $2... with aliases for the column names (or else the DDL will try to create column names like $1). If the query runs directly against a stage, for the old DB, schema, and table, you could put in whatever because that's only used to generate an insert from select statement.

-- This shows how to use on the Snowflake TPCH sample, but could be any query.
-- Keep the row count down to reduce the time it take to infer the types.
call infer_data_types('select * from SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM limit 10000', 
                      'SNOWFLAKE_SAMPLE_DATA', 'TPCH_SF1', 'LINEITEM',
                      'TEST', 'PUBLIC', 'LINEITEM');

create or replace procedure INFER_DATA_TYPES(SOURCE_QUERY string,
                                             DATABASE_OLD string,
                                             SCHEMA_OLD string,
                                             TABLE_OLD string,
                                             DATABASE_NEW string,
                                             SCHEMA_NEW string,
                                             TABLE_NEW string)
returns string
language javascript
as
$$

/****************************************************************************************************
*                                                                                                   *
*  DataType Classes                                        
*                                                                                                   *
****************************************************************************************************/

class Query{
    constructor(statement){
        this.statement = statement;
    }
}


class DataType {
    constructor(db, schema, table, column, sourceQuery) {
        this.db = db;
        this.schema = schema;
        this.table = table;
        this.sourceQuery = sourceQuery
        this.column = column;
        this.insert = '"@~COLUMN~@"';
        this.totalCount = 0;
        this.notNullCount = 0;
        this.typeCount = 0;
        this.blankCount = 0;
        this.minTypeOf  = 0.95;
        this.minNotNull = 1.00;
    }
    setSQL(sqlTemplate){
        this.sql = sqlTemplate;
        this.sql = this.sql.replace(/@~DB~@/g,     this.db);
        this.sql = this.sql.replace(/@~SCHEMA~@/g, this.schema);
        this.sql = this.sql.replace(/@~TABLE~@/g,  this.table);
        this.sql = this.sql.replace(/@~COLUMN~@/g, this.column);
    }
    getCounts(){
        var rs;
        rs = GetResultSet(this.sql);
        rs.next();
        this.totalCount   = rs.getColumnValue("TOTAL_COUNT");
        this.notNullCount = rs.getColumnValue("NON_NULL_COUNT");
        this.typeCount    = rs.getColumnValue("TO_TYPE_COUNT");
        this.blankCount   = rs.getColumnValue("BLANK");
    }
    isCorrectType(){
        return (this.typeCount / (this.notNullCount - this.blankCount) >= this.minTypeOf);
    }
    isNotNull(){
        return (this.notNullCount / this.totalCount >= this.minNotNull);
    }
}

class TimestampType extends DataType{
    constructor(db, schema, table, column, sourceQuery){
        super(db, schema, table, column, sourceQuery)
        this.syntax = "timestamp";
        this.insert = 'try_multi_timestamp(trim("@~COLUMN~@"))';
        this.sourceQuery = SOURCE_QUERY;
        this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
        this.getCounts();
    }
}

class IntegerType extends DataType{
    constructor(db, schema, table, column, sourceQuery){
        super(db, schema, table, column, sourceQuery)
        this.syntax = "number(38,0)";
        this.insert = 'try_to_number(trim("@~COLUMN~@"), 38, 0)';
        this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
        this.getCounts();
    }
}

class DoubleType extends DataType{
    constructor(db, schema, table, column, sourceQuery){
        super(db, schema, table, column, sourceQuery)
        this.syntax = "double";
        this.insert = 'try_to_double(trim("@~COLUMN~@"))';
        this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
        this.getCounts();
    }
}

class BooleanType extends DataType{
    constructor(db, schema, table, column, sourceQuery){
        super(db, schema, table, column, sourceQuery)
        this.syntax = "boolean";
        this.insert = 'try_to_boolean(trim("@~COLUMN~@"))';
        this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
        this.getCounts();
    }
}

 // Catch all is STRING data type
class StringType extends DataType{
    constructor(db, schema, table, column, sourceQuery){
        super(db, schema, table, column, sourceQuery)
        this.syntax = "string";
        this.totalCount   = 1;
        this.notNullCount = 0;
        this.typeCount    = 1;
        this.minTypeOf    = 0;
        this.minNotNull   = 1;
    }
}

/****************************************************************************************************
*                                                                                                   *
*  Main function                                                                                    *
*                                                                                                   *
****************************************************************************************************/

var pass    = 0;
var column;
var typeOf;
var ins = '';

var newTableDDL = '';
var insertDML   = '';

var columnRS = GetResultSet(GetTableColumnsSQL(DATABASE_OLD, SCHEMA_OLD, TABLE_OLD));

while (columnRS.next()){
    pass++;
    if(pass > 1){
        newTableDDL += ",\n";
        insertDML   += ",\n";
    }
    column = columnRS.getColumnValue("COLUMN_NAME");
    typeOf = InferDataType(DATABASE_OLD, SCHEMA_OLD, TABLE_OLD, column, SOURCE_QUERY);
    newTableDDL += '"' + typeOf.column + '" ' + typeOf.syntax;
    ins = typeOf.insert;
    insertDML   += ins.replace(/@~COLUMN~@/g, typeOf.column);
}

return GetOpeningComments()                                     +
       GetDDLPrefixSQL(DATABASE_NEW, SCHEMA_NEW, TABLE_NEW)     +
       newTableDDL                                              +
       GetDDLSuffixSQL()                                        +
       GetDividerSQL()                                          +
       GetInsertPrefixSQL(DATABASE_NEW, SCHEMA_NEW, TABLE_NEW)  +
       insertDML                                                +
       GetInsertSuffixSQL(DATABASE_OLD, SCHEMA_OLD, TABLE_OLD)  ;

/****************************************************************************************************
*                                                                                                   *
*  Helper functions                                                                                 *
*                                                                                                   *
****************************************************************************************************/

function InferDataType(db, schema, table, column, sourceQuery){

    var typeOf;

    typeOf = new IntegerType(db, schema, table, column, sourceQuery);
    if (typeOf.isCorrectType()) return typeOf;

    typeOf = new DoubleType(db, schema, table, column, sourceQuery);
    if (typeOf.isCorrectType()) return typeOf;

    typeOf = new BooleanType(db, schema, table, column, sourceQuery);        // May want to do a distinct and look for two values
    if (typeOf.isCorrectType()) return typeOf;

    typeOf = new TimestampType(db, schema, table, column, sourceQuery);
    if (typeOf.isCorrectType()) return typeOf;

    typeOf = new StringType(db, schema, table, column, sourceQuery);
    if (typeOf.isCorrectType()) return typeOf;

    return null;
}

/****************************************************************************************************
*                                                                                                   *
*  SQL Template Functions                                                                           *
*                                                                                                   *
****************************************************************************************************/

function GetCheckTypeSQL(insert, sourceQuery){

var sql = 
`
select      count(1)                              as TOTAL_COUNT,
            count("@~COLUMN~@")                   as NON_NULL_COUNT,
            count(${insert})                      as TO_TYPE_COUNT,
            sum(iff(trim("@~COLUMN~@")='', 1, 0)) as BLANK
--from        "@~DB~@"."@~SCHEMA~@"."@~TABLE~@";
from        (${sourceQuery})
`;

return sql;
}

function GetTableColumnsSQL(dbName, schemaName, tableName){

var sql = 
`
select  COLUMN_NAME 
from    ${dbName}.INFORMATION_SCHEMA.COLUMNS
where   TABLE_CATALOG = '${dbName}' and
        TABLE_SCHEMA  = '${schemaName}' and
        TABLE_NAME    = '${tableName}'
order by ORDINAL_POSITION;
`;
  
return sql;
}

function GetOpeningComments(){
return `
/**************************************************************************************************************
*                                                                                                             *
*   Copy and paste into a worksheet to create the typed table and insert into the new table from the old one. *
*                                                                                                             *
**************************************************************************************************************/
`;
}

function GetDDLPrefixSQL(db, schema, table){

var sql =
`
create or replace table "${db}"."${schema}"."${table}"
(
`;

    return sql;
}

function GetDDLSuffixSQL(){
    return "\n);";
}

function GetDividerSQL(){
return `
/**************************************************************************************************************
*                                                                                                             *
*   The SQL statement below this attempts to copy all rows from the string tabe to the typed table.           *
*                                                                                                             *
**************************************************************************************************************/
`;
}

function GetInsertPrefixSQL(db, schema, table){
var sql =
`\ninsert into "${db}"."${schema}"."${table}" select\n`;
return sql;
}

function GetInsertSuffixSQL(db, schema, table){
var sql =
`\nfrom "${db}"."${schema}"."${table}" ;`;
return sql;
}

//function GetInsertSuffixSQL(db, schema, table){
//var sql = '\nfrom "${db}"."${schema}"."${table}";';
//return sql;
//}


/****************************************************************************************************
*                                                                                                   *
*  SQL functions                                                                                    *
*                                                                                                   *
****************************************************************************************************/

function GetResultSet(sql){
    cmd1 = {sqlText: sql};
    stmt = snowflake.createStatement(cmd1);
    var rs;
    rs = stmt.execute();
    return rs;
}

function ExecuteNonQuery(queryString) {
    var out = '';
    cmd1 = {sqlText: queryString};
    stmt = snowflake.createStatement(cmd1);
    var rs;
    rs = stmt.execute();
}

function ExecuteSingleValueQuery(columnName, queryString) {
    var out;
    cmd1 = {sqlText: queryString};
    stmt = snowflake.createStatement(cmd1);
    var rs;
    try{
        rs = stmt.execute();
        rs.next();
        return rs.getColumnValue(columnName);
    }
    catch(err) {
        if (err.message.substring(0, 18) == "ResultSet is empty"){
            throw "ERROR: No rows returned in query.";
        } else {
            throw "ERROR: " + err.message.replace(/\n/g, " ");
        } 
    }
    return out;
}

function ExecuteFirstValueQuery(queryString) {
    var out;
    cmd1 = {sqlText: queryString};
    stmt = snowflake.createStatement(cmd1);
    var rs;
    try{
        rs = stmt.execute();
        rs.next();
        return rs.getColumnValue(1);
    }
    catch(err) {
        if (err.message.substring(0, 18) == "ResultSet is empty"){
            throw "ERROR: No rows returned in query.";
        } else {
            throw "ERROR: " + err.message.replace(/\n/g, " ");
        } 
    }
    return out;
}

function getQuery(sql){
    var cmd = {sqlText: sql};
    var query = new Query(snowflake.createStatement(cmd));
    try {
        query.resultSet = query.statement.execute();
    } catch (err) {
        throw "ERROR: " + err.message.replace(/\n/g, " ");
    }
    return query;
}

$$;

这篇关于我如何在加载它们时自动推断 S3 上的 CSV 文件的架构?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆