如何在AWS Athena中自动执行MSCK REPAIR TABLE [英] How to make MSCK REPAIR TABLE execute automatically in AWS Athena

查看:754
本文介绍了如何在AWS Athena中自动执行MSCK REPAIR TABLE的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个每小时执行一次的火花批处理作业。每次运行都会在 S3 中生成新的数据并将其存储在目录命名模式 DATA / YEAR =?/ MONTH =?/ DATE =?/ datafile

在将数据上传到 S3 后,我想用 Athena 。更多,我想通过连接Athena作为数据源在 QuickSight 中将它们可视化。

问题在于,在每次运行Spark批处理后,存储在 S3 中的新生成的数据将不会被Athena发现,除非我手动运行查询 MSCK REPARI TABLE

有没有办法让雅典娜自动更新数据,以便我可以创建一个全自动的数据可视化管道?

解决方案

有很多方法可以调度这个任务。你如何安排你的工作流程?您是否使用 Airflow Luigi Azkaban ,cron或使用< AWS数据流水线?

从任何一个这些,你应该能够引发下面的CLI命令。



$ aws athena start-query-execution --query-stringMSCK REPAIR TABLE some_database.some_table--result-configurationOutputLocation = s3:// SOMEPLACE



另一个选项是 AWS Lambda 。你可以有一个函数调用 MSCK REPAIR TABLE some_database.some_table 来响应S3的新上传。



一个Lambda函数示例可以这样写:

  import boto3 

def lambda_handler(event,context):
bucket_name ='some_bucket'

client = boto3.client('athena')

config = {
'OutputLocation':'s3://'+ bucket_name +'/',
'EncryptionConfiguration':{'EncryptionOption':'SSE_S3'}

}

#查询执行参数
sql ='MSCK REPAIR TABLE some_database.some_table'
context = {'Database':'some_database'}

client.start_query_execution(QueryString = sql,
QueryExecutionContext = context,
ResultConfiguration = config)

然后,当您的存储桶中的 DATA / 前缀下添加新数据时,您将配置一个触发器来执行您的Lambda函数。


$ b

最终,在使用作业调度程序运行Spark作业之后,显式重建分区具有自我记录的优势。另一方面,AWS Lambda适用于像这样的工作。

I have a spark batch job which is executed hourly. Each run generates and stores new data in S3 with the directory naming pattern DATA/YEAR=?/MONTH=?/DATE=?/datafile.

After uploading the data to S3, I want to investigate them using Athena. More, I would like to visualize them in QuickSight by connecting to Athena as a data source.

The problem is that, after each run of my Spark batch, the newly generated data stored in S3 will not be discovered by Athena, unless I manually run the query MSCK REPARI TABLE.

Is there a way to make Athena update the data automatically, so that I can create a fully automatic data visualization pipeline?

解决方案

There are a number of ways to schedule this task. How do you schedule your workflows? Do you use a system like Airflow, Luigi, Azkaban, cron, or using an AWS Data pipeline?

From any of these, you should be able to fire off the following CLI command.

$ aws athena start-query-execution --query-string "MSCK REPAIR TABLE some_database.some_table" --result-configuration "OutputLocation=s3://SOMEPLACE"

Another option would be AWS Lambda. You could have a function that calls MSCK REPAIR TABLE some_database.some_table in response to a new upload to S3.

An example Lambda Function could be written as such:

import boto3

def lambda_handler(event, context):
    bucket_name = 'some_bucket'

    client = boto3.client('athena')

    config = {
        'OutputLocation': 's3://' + bucket_name + '/',
        'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}

    }

    # Query Execution Parameters
    sql = 'MSCK REPAIR TABLE some_database.some_table'
    context = {'Database': 'some_database'}

    client.start_query_execution(QueryString = sql, 
                                 QueryExecutionContext = context,
                                 ResultConfiguration = config)

You would then configure a trigger to execute your Lambda function when new data are added under the DATA/ prefix in your bucket.

Ultimately, explicitly rebuilding the partitions after you run your Spark Job using a job scheduler has the advantage of being self documenting. On the other hand, AWS Lambda is convenient for jobs like this one.

这篇关于如何在AWS Athena中自动执行MSCK REPAIR TABLE的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆