如何通过条件任务运行气流DAG [英] How to run airflow DAG with conditional tasks
问题描述
总共有6个任务,这些任务需要根据输入json中出现的一个字段的( flag_value )值执行. 如果 flag_value 的值为true,则所有任务都需要以以下方式执行: 然后,第一任务1平行于(任务2和任务3在一起),平行于任务4,平行于任务5. 完成所有步骤后,再执行task6. 由于是气流和DAG的新手,我不知道如何在这种情况下运行.
There are total 6 tasks are there.These tasks need to get execute based on one field's(flag_value) value which is coming in input json. If the value of flag_value is true then all tasks need to get execute in such a way that , First task1 then parallell to (task2 & task3 together), parallell to task4, parallell to task5. Once all this finishes then task6. Since am new to airflow and DAG i dont know how to run for this condition.
如果 flag_value 的值为false,则该顺序仅是顺序的
task_1>> task_4>> task_5>> task_6.
If the value of flag_value is false then the order is sequential only
task_1 >> task_4 >> task_5 >> task_6.
下面是我的DAG代码.
Below is my code for the DAG.
from airflow import DAG
from datetime import datetime
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False
}
dag = DAG('DAG_FOR_TEST',default_args=default_args,schedule_interval=None,max_active_runs=3, start_date=datetime(2020, 7, 8))
#################### CREATE TASK #####################################
task_1 = DatabricksSubmitRunOperator(
task_id='task_1',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_1/task_1.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_1.driver.TestClass1',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_2 = DatabricksSubmitRunOperator(
task_id='task_2',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_2/task_2.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_2.driver.TestClass2',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_3 = DatabricksSubmitRunOperator(
task_id='task_3',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_3/task_3.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_3.driver.TestClass3',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_4 = DatabricksSubmitRunOperator(
task_id='task_4',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_4/task_4.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_4.driver.TestClass4',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_5 = DatabricksSubmitRunOperator(
task_id='task_5',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_5/task_5.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_5.driver.TestClass5',
'parameters' : [
'json ={{ dag_run.conf.json }}'
]
}
)
task_6 = DatabricksSubmitRunOperator(
task_id='task_6',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_6/task_6.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_6.driver.TestClass6',
'parameters' : ['{{ dag_run.conf.json }}'
]
}
)
flag_value='{{ dag_run.conf.json.flag_value }}'
#################### ORDER OF OPERATORS ###########################
if flag_value == 'true':
task_1.dag = dag
task_2.dag = dag
task_3.dag = dag
task_4.dag = dag
task_5.dag = dag
task_6.dag = dag
task_1 >> [task_2 , task_3] >> [task_4] >> [task_5] >> task_6 // Not sure correct
else:
task_1.dag = dag
task_4.dag = dag
task_5.dag = dag
task_6.dag = dag
task_1 >> task_4 >> task_5 >> task_6
推荐答案
首先,依赖关系不正确,这应该可以工作:
First of all, dependency is not correct, this should work:
task_1 >> [task_2 , task_3] >> task_4 >> task_5 >> task_6
不可能用list_1 >> list_2
来订购任务,但是有提供此帮助的方法,请参见:
It is not possible to order tasks with list_1 >> list_2
, but there are helper methods to provide this, see: cross_downstream.
对于分支,您可以使用BranchPythonOperator
更改触发规则您的任务.不确定以下代码,可能会有一些小错误,但是这里的想法可行.
For branching, you can use BranchPythonOperator
with changing trigger rules of your tasks. Not sure about the following code, it could have minor errors, but the idea here works.
task_4.trigger_rule = "none_failed"
dummy = DummyOperator(task_id="dummy", dag=dag)
branch = BranchPythonOperator(
task_id="branch",
# jinja template returns string "True" or "False"
python_callable=lambda f: ["task_2" , "task_3"] if f == "True" else "dummy",
op_kwargs={"f": flag_value},
dag=dag)
task_1 >> branch
branch >> [task_2 , task_3, dummy] >> task_4
task_4 >> task_5 >> task_6
可能会有更好的方法.
这篇关于如何通过条件任务运行气流DAG的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!