使用UI的Airflow S3连接 [英] Airflow s3 connection using UI

查看:356
本文介绍了使用UI的Airflow S3连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在尝试使用Airflow安排DAG。
一个DAG包含一个从s3存储桶加载数据的任务。

I've been trying to use Airflow to schedule a DAG. One of the DAG includes a task which loads data from s3 bucket.

出于上述目的,我需要设置s3连接。但是airflow提供的用户界面不是那么直观( http://pythonhosted.org/ airflow / configuration.html?highlight = connection#connections )。任何人都可以成功建立s3连接,如果有的话,您是否遵循最佳实践?

For the purpose above I need to setup s3 connection. But UI provided by airflow isn't that intutive (http://pythonhosted.org/airflow/configuration.html?highlight=connection#connections). Any one succeeded setting up the s3 connection if so are there any best practices you folks follow?

谢谢。

推荐答案

很难找到引用,但是经过一番挖掘,我终于能够使它起作用。

It's hard to find references, but after digging a bit I was able to make it work.

使用以下属性创建新连接:

Create a new connection with the following attributes:

Conn ID: my_conn_S3

Conn Id: my_conn_S3

连接类型: S3

附加:

{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}






长版,设置UI连接:




  • 在Airflow UI上,转到管理>连接

  • 使用以下属性创建新连接:


    • Conn ID: my_conn_S3

    • Conn类型: S3

    • 额外: { aws_acce ss_key_id: _ your_aws_access_key_id_, aws_secret_access_key: _your_aws_secret_access_key _}

    • 将所有其他字段(主机,架构,登录)保留为空白。


    • Long version, setting up UI connection:

      • On Airflow UI, go to Admin > Connections
      • Create a new connection with the following attributes:
        • Conn Id: my_conn_S3
        • Conn Type: S3
        • Extra: {"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
        • Leave all the other fields (Host, Schema, Login) blank.
        • 要使用此连接,您可以在下面找到一个简单的S3 Sensor Test。此测试的想法是设置一个传感器来监视S3中的文件(T1任务),一旦满足以下条件,它将触发bash命令(T2任务)。

          To use this connection, below you can find a simple S3 Sensor Test. The idea of this test is to set up a sensor that watches files in S3 (T1 task) and once below condition is satisfied it triggers a bash command (T2 task).


          • 在运行DAG之前,请确保您有一个名为 S3-Bucket-To-Watch的S3存储桶。

          • 在s3_dag_test.py下添加到airflow dags文件夹(〜/ airflow / dags)

          • 启动 airflow webserver

          • 转到Airflow UI( http:// localhost:8383 /

          • 启动气流调度程序

          • 打开 s3_dag_test DAG

          • 选择's3_dag_test'以显示dag详细信息。

          • 在图形视图上,您​​应该能够看到其当前状态。

          • 'check_s3_for_file_in_s3'任务应处于活动状态并正在运行。

          • 现在,将名为 file-to-watch-1的文件添加到您的 S3-Bucket-To-Watch。

          • 第一个任务应该已经完成​​,第二个任务应该开始并完成。

          • Before running the DAG, ensure you've an S3 bucket named 'S3-Bucket-To-Watch'.
          • Add below s3_dag_test.py to airflow dags folder (~/airflow/dags)
          • Start airflow webserver.
          • Go to Airflow UI (http://localhost:8383/)
          • Start airflow scheduler.
          • Turn on 's3_dag_test' DAG on the main DAGs view.
          • Select 's3_dag_test' to show the dag details.
          • On the Graph View you should be able to see it's current state.
          • 'check_s3_for_file_in_s3' task should be active and running.
          • Now, add a file named 'file-to-watch-1' to your 'S3-Bucket-To-Watch'.
          • First tasks should have been completed, second should be started and finish.

          在dag定义中的schedule_interval设置为'@once',以方便调试。

          The schedule_interval in the dag definition is set to '@once', to facilitate debugging.

          要再次运行,请保留所有内容,删除存储桶中的文件,然后选择第一个任务(在图形视图中)并选择全部清除,然后重试过去,未来,上游,下游 ....活动。这应该再次启动DAG。

          To run it again, leave everything as it's, remove files in the bucket and try again by selecting the first task (in the graph view) and selecting 'Clear' all 'Past','Future','Upstream','Downstream' .... activity. This should kick off the DAG again.

          让我知道它的运行方式。

          Let me know how it went.

          """
          S3 Sensor Connection Test
          """
          
          from airflow import DAG
          from airflow.operators import SimpleHttpOperator, HttpSensor,   BashOperator, EmailOperator, S3KeySensor
          from datetime import datetime, timedelta
          
          default_args = {
              'owner': 'airflow',
              'depends_on_past': False,
              'start_date': datetime(2016, 11, 1),
              'email': ['something@here.com'],
              'email_on_failure': False,
              'email_on_retry': False,
              'retries': 5,
              'retry_delay': timedelta(minutes=5)
          }
          
          dag = DAG('s3_dag_test', default_args=default_args, schedule_interval= '@once')
          
          t1 = BashOperator(
              task_id='bash_test',
              bash_command='echo "hello, it should work" > s3_conn_test.txt',
              dag=dag)
          
          sensor = S3KeySensor(
              task_id='check_s3_for_file_in_s3',
              bucket_key='file-to-watch-*',
              wildcard_match=True,
              bucket_name='S3-Bucket-To-Watch',
              s3_conn_id='my_conn_S3',
              timeout=18*60*60,
              poke_interval=120,
              dag=dag)
          
          t1.set_upstream(sensor)
          






          主要参考文献:

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆