使用UI的Airflow S3连接 [英] Airflow s3 connection using UI
问题描述
我一直在尝试使用Airflow安排DAG。
一个DAG包含一个从s3存储桶加载数据的任务。
I've been trying to use Airflow to schedule a DAG. One of the DAG includes a task which loads data from s3 bucket.
出于上述目的,我需要设置s3连接。但是airflow提供的用户界面不是那么直观( http://pythonhosted.org/ airflow / configuration.html?highlight = connection#connections )。任何人都可以成功建立s3连接,如果有的话,您是否遵循最佳实践?
For the purpose above I need to setup s3 connection. But UI provided by airflow isn't that intutive (http://pythonhosted.org/airflow/configuration.html?highlight=connection#connections). Any one succeeded setting up the s3 connection if so are there any best practices you folks follow?
谢谢。
推荐答案
很难找到引用,但是经过一番挖掘,我终于能够使它起作用。
It's hard to find references, but after digging a bit I was able to make it work.
使用以下属性创建新连接:
Create a new connection with the following attributes:
Conn ID: my_conn_S3
Conn Id: my_conn_S3
连接类型: S3
附加:
{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
长版,设置UI连接:
- 在Airflow UI上,转到管理>连接
- 使用以下属性创建新连接:
- Conn ID:
my_conn_S3
- Conn类型:
S3
- 额外:
{ aws_acce ss_key_id: _ your_aws_access_key_id_, aws_secret_access_key: _your_aws_secret_access_key _}
- 将所有其他字段(主机,架构,登录)保留为空白。
- On Airflow UI, go to Admin > Connections
- Create a new connection with the following attributes:
- Conn Id:
my_conn_S3
- Conn Type:
S3
- Extra:
{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
- Leave all the other fields (Host, Schema, Login) blank.
- 在运行DAG之前,请确保您有一个名为 S3-Bucket-To-Watch的S3存储桶。
- 在s3_dag_test.py下添加到airflow dags文件夹(〜/ airflow / dags)
- 启动
airflow webserver
。 - 转到Airflow UI( http:// localhost:8383 / )
- 启动
气流调度程序
。 - 打开 s3_dag_test DAG
- 选择's3_dag_test'以显示dag详细信息。
- 在图形视图上,您应该能够看到其当前状态。
- 'check_s3_for_file_in_s3'任务应处于活动状态并正在运行。
- 现在,将名为 file-to-watch-1的文件添加到您的 S3-Bucket-To-Watch。
- 第一个任务应该已经完成,第二个任务应该开始并完成。
- Before running the DAG, ensure you've an S3 bucket named 'S3-Bucket-To-Watch'.
- Add below s3_dag_test.py to airflow dags folder (~/airflow/dags)
- Start
airflow webserver
. - Go to Airflow UI (http://localhost:8383/)
- Start
airflow scheduler
. - Turn on 's3_dag_test' DAG on the main DAGs view.
- Select 's3_dag_test' to show the dag details.
- On the Graph View you should be able to see it's current state.
- 'check_s3_for_file_in_s3' task should be active and running.
- Now, add a file named 'file-to-watch-1' to your 'S3-Bucket-To-Watch'.
- First tasks should have been completed, second should be started and finish.
要使用此连接,您可以在下面找到一个简单的S3 Sensor Test。此测试的想法是设置一个传感器来监视S3中的文件(T1任务),一旦满足以下条件,它将触发bash命令(T2任务)。
To use this connection, below you can find a simple S3 Sensor Test. The idea of this test is to set up a sensor that watches files in S3 (T1 task) and once below condition is satisfied it triggers a bash command (T2 task).
在dag定义中的schedule_interval设置为'@once',以方便调试。
The schedule_interval in the dag definition is set to '@once', to facilitate debugging.
要再次运行,请保留所有内容,删除存储桶中的文件,然后选择第一个任务(在图形视图中)并选择全部清除,然后重试过去,未来,上游,下游 ....活动。这应该再次启动DAG。
To run it again, leave everything as it's, remove files in the bucket and try again by selecting the first task (in the graph view) and selecting 'Clear' all 'Past','Future','Upstream','Downstream' .... activity. This should kick off the DAG again.
让我知道它的运行方式。
Let me know how it went.
""" S3 Sensor Connection Test """ from airflow import DAG from airflow.operators import SimpleHttpOperator, HttpSensor, BashOperator, EmailOperator, S3KeySensor from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2016, 11, 1), 'email': ['something@here.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 5, 'retry_delay': timedelta(minutes=5) } dag = DAG('s3_dag_test', default_args=default_args, schedule_interval= '@once') t1 = BashOperator( task_id='bash_test', bash_command='echo "hello, it should work" > s3_conn_test.txt', dag=dag) sensor = S3KeySensor( task_id='check_s3_for_file_in_s3', bucket_key='file-to-watch-*', wildcard_match=True, bucket_name='S3-Bucket-To-Watch', s3_conn_id='my_conn_S3', timeout=18*60*60, poke_interval=120, dag=dag) t1.set_upstream(sensor)
主要参考文献:
- https://gitter.im/apache/incubator-airflow
- https://groups.google.com/forum/# !topic / airbnb_airflow / TXsJNOBBfig
- https:// github.com/apache/incubator-airflow
- https://gitter.im/apache/incubator-airflow
- https://groups.google.com/forum/#!topic/airbnb_airflow/TXsJNOBBfig
- https://github.com/apache/incubator-airflow
Main References:这篇关于使用UI的Airflow S3连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
- Conn Id:
Long version, setting up UI connection:
- Conn ID: