如何使用Python在Airflow中成功触发另一个DAG时触发DAG? [英] How to Trigger a DAG on the success of a another DAG in Airflow using Python?
本文介绍了如何使用Python在Airflow中成功触发另一个DAG时触发DAG?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一个python DAG Parent Job
和DAG Child Job
。成功完成每天运行的 Parent Job
任务后,应触发 Child Job
中的任务。如何添加外部作业触发器?
我的代码
从datetime导入datetime,timedelta
从airflow导入DAG
从airflow.operators.postgres_operator导入PostgresOperator
从utils导入FAILURE_EMAILS
昨天=日期时间.combine(datetime.today()-timedelta(1),datetime.min.time())
default_args = {
'所有者':'气流',
'depends_on_past':否,
'start_date':昨天,
'email':FAILURE_EMAILS,
'email_on_failure':False,
'email_on_retry':False,
'重试':1,
'retry_delay':timedelta(minutes = 5)
}
dag = DAG('Child Job',default_args = default_args,schedule_interval ='@ daily')
execute_notebook = PostgresOperator(
task_id ='data_sql',
postgres_conn_id ='REDSHIFT_CONN',
sql = SELECT * FROM athena_rs。装运限制5,
dag = dag
)
解决方案
答案在
Parent_dag
Child_dag
I have a python DAG Parent Job
and DAG Child Job
. The tasks in the Child Job
should be triggered on the successful completion of the Parent Job
tasks which are run daily. How can add external job trigger ?
MY CODE
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('Child Job', default_args=default_args, schedule_interval='@daily')
execute_notebook = PostgresOperator(
task_id='data_sql',
postgres_conn_id='REDSHIFT_CONN',
sql="SELECT * FROM athena_rs.shipments limit 5",
dag=dag
)
解决方案
Answer is in this thread already. Below is demo code:
Parent dag:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 4, 29),
}
dag = DAG('Parent_dag', default_args=default_args, schedule_interval='@daily')
leave_work = DummyOperator(
task_id='leave_work',
dag=dag,
)
cook_dinner = DummyOperator(
task_id='cook_dinner',
dag=dag,
)
leave_work >> cook_dinner
Child dag:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 4, 29),
}
dag = DAG('Child_dag', default_args=default_args, schedule_interval='@daily')
# Use ExternalTaskSensor to listen to the Parent_dag and cook_dinner task
# when cook_dinner is finished, Child_dag will be triggered
wait_for_dinner = ExternalTaskSensor(
task_id='wait_for_dinner',
external_dag_id='Parent_dag',
external_task_id='cook_dinner',
start_date=datetime(2020, 4, 29),
execution_delta=timedelta(hours=1),
timeout=3600,
)
have_dinner = DummyOperator(
task_id='have_dinner',
dag=dag,
)
play_with_food = DummyOperator(
task_id='play_with_food',
dag=dag,
)
wait_for_dinner >> have_dinner
wait_for_dinner >> play_with_food
Images:
Dags
Parent_dag
Child_dag
这篇关于如何使用Python在Airflow中成功触发另一个DAG时触发DAG?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文