如何在AWS Athena中自动执行MSCK REPAIR TABLE [英] How to make MSCK REPAIR TABLE execute automatically in AWS Athena
问题描述
我有一个每小时执行一次的火花批处理作业。每次运行都会在 在将数据上传到 问题在于,在每次运行Spark批处理后,存储在 S3
中生成新的数据并将其存储在目录命名模式 DATA / YEAR =?/ MONTH =?/ DATE =?/ datafile
。
S3
后,我想用 Athena
。更多,我想通过连接Athena作为数据源在
QuickSight
中将它们可视化。
S3
中的新生成的数据将不会被Athena发现,除非我手动运行查询 MSCK REPARI TABLE
。
有很多方法可以调度这个任务。你如何安排你的工作流程?您是否使用 Airflow , Luigi , Azkaban ,cron或使用< AWS数据流水线?
从任何一个这些,你应该能够引发下面的CLI命令。
$ aws athena start-query-execution --query-stringMSCK REPAIR TABLE some_database.some_table--result-configurationOutputLocation = s3:// SOMEPLACE
另一个选项是 AWS Lambda 。你可以有一个函数调用 MSCK REPAIR TABLE some_database.some_table
来响应S3的新上传。
一个Lambda函数示例可以这样写:
import boto3
def lambda_handler(event,context):
bucket_name ='some_bucket'
client = boto3.client('athena')
config = {
'OutputLocation':'s3://'+ bucket_name +'/',
'EncryptionConfiguration':{'EncryptionOption':'SSE_S3'}
}
#查询执行参数
sql ='MSCK REPAIR TABLE some_database.some_table'
context = {'Database':'some_database'}
client.start_query_execution(QueryString = sql,
QueryExecutionContext = context,
ResultConfiguration = config)
然后,当您的存储桶中的 DATA /
前缀下添加新数据时,您将配置一个触发器来执行您的Lambda函数。
$ b
最终,在使用作业调度程序运行Spark作业之后,显式重建分区具有自我记录的优势。另一方面,AWS Lambda适用于像这样的工作。
I have a spark batch job which is executed hourly. Each run generates and stores new data in S3
with the directory naming pattern DATA/YEAR=?/MONTH=?/DATE=?/datafile
.
After uploading the data to S3
, I want to investigate them using Athena
. More, I would like to visualize them in QuickSight
by connecting to Athena as a data source.
The problem is that, after each run of my Spark batch, the newly generated data stored in S3
will not be discovered by Athena, unless I manually run the query MSCK REPARI TABLE
.
Is there a way to make Athena update the data automatically, so that I can create a fully automatic data visualization pipeline?
There are a number of ways to schedule this task. How do you schedule your workflows? Do you use a system like Airflow, Luigi, Azkaban, cron, or using an AWS Data pipeline?
From any of these, you should be able to fire off the following CLI command.
$ aws athena start-query-execution --query-string "MSCK REPAIR TABLE some_database.some_table" --result-configuration "OutputLocation=s3://SOMEPLACE"
Another option would be AWS Lambda. You could have a function that calls MSCK REPAIR TABLE some_database.some_table
in response to a new upload to S3.
An example Lambda Function could be written as such:
import boto3
def lambda_handler(event, context):
bucket_name = 'some_bucket'
client = boto3.client('athena')
config = {
'OutputLocation': 's3://' + bucket_name + '/',
'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
}
# Query Execution Parameters
sql = 'MSCK REPAIR TABLE some_database.some_table'
context = {'Database': 'some_database'}
client.start_query_execution(QueryString = sql,
QueryExecutionContext = context,
ResultConfiguration = config)
You would then configure a trigger to execute your Lambda function when new data are added under the DATA/
prefix in your bucket.
Ultimately, explicitly rebuilding the partitions after you run your Spark Job using a job scheduler has the advantage of being self documenting. On the other hand, AWS Lambda is convenient for jobs like this one.
这篇关于如何在AWS Athena中自动执行MSCK REPAIR TABLE的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!