如何使用Python在Airflow中成功触发另一个DAG时触发DAG? [英] How to Trigger a DAG on the success of a another DAG in Airflow using Python?

查看:600
本文介绍了如何使用Python在Airflow中成功触发另一个DAG时触发DAG?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个python DAG Parent Job 和DAG Child Job 。成功完成每天运行的 Parent Job 任务后,应触发 Child Job 中的任务。如何添加外部作业触发器?



我的代码

 从datetime导入datetime,timedelta 
从airflow导入DAG
从airflow.operators.postgres_operator导入PostgresOperator
从utils导入FAILURE_EMAILS

昨天=日期时间.combine(datetime.today()-timedelta(1),datetime.min.time())


default_args = {
'所有者':'气流',
'depends_on_past':否,
'start_date':昨天,
'email':FAILURE_EMAILS,
'email_on_failure':False,
'email_on_retry':False,
'重试':1,
'retry_delay':timedelta(minutes = 5)
}

dag = DAG('Child Job',default_args = default_args,schedule_interval ='@ daily')

execute_notebook = PostgresOperator(
task_id ='data_sql',
postgres_conn_id ='REDSHIFT_CONN',
sql = SELECT * FROM athena_rs。装运限制5,
dag = dag


解决方案

答案在



Parent_dag





Child_dag




I have a python DAG Parent Job and DAG Child Job. The tasks in the Child Job should be triggered on the successful completion of the Parent Job tasks which are run daily. How can add external job trigger ?

MY CODE

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Child Job', default_args=default_args, schedule_interval='@daily')

execute_notebook = PostgresOperator(
  task_id='data_sql',
  postgres_conn_id='REDSHIFT_CONN',
  sql="SELECT * FROM athena_rs.shipments limit 5",
  dag=dag
)

解决方案

Answer is in this thread already. Below is demo code:

Parent dag:

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 4, 29),
}

dag = DAG('Parent_dag', default_args=default_args, schedule_interval='@daily')

leave_work = DummyOperator(
    task_id='leave_work',
    dag=dag,
)
cook_dinner = DummyOperator(
    task_id='cook_dinner',
    dag=dag,
)

leave_work >> cook_dinner

Child dag:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 4, 29),
}

dag = DAG('Child_dag', default_args=default_args, schedule_interval='@daily')

# Use ExternalTaskSensor to listen to the Parent_dag and cook_dinner task
# when cook_dinner is finished, Child_dag will be triggered
wait_for_dinner = ExternalTaskSensor(
    task_id='wait_for_dinner',
    external_dag_id='Parent_dag',
    external_task_id='cook_dinner',
    start_date=datetime(2020, 4, 29),
    execution_delta=timedelta(hours=1),
    timeout=3600,
)

have_dinner = DummyOperator(
    task_id='have_dinner',
    dag=dag,
)
play_with_food = DummyOperator(
    task_id='play_with_food',
    dag=dag,
)

wait_for_dinner >> have_dinner
wait_for_dinner >> play_with_food

Images:

Dags

Parent_dag

Child_dag

这篇关于如何使用Python在Airflow中成功触发另一个DAG时触发DAG?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆