如何根据上一个任务的结果在SubDAG中真正创建n个任务 [英] How to really create n tasks in a SubDAG based on the result of a previous task
问题描述
我正在使用SubDAG在Airflow中创建动态DAG。我需要做的是,SubDAG中的任务数量由上一个任务的结果( middle_section
subtask_ids 变量的结果)确定。 code>函数应该与 initial_task
函数的变量相同)。
I'm creating a dynamic DAG in Airflow using SubDAGs. The thing I need is that the number of tasks inside the SubDAG is determined by the result of a previous task (the subtask_ids
variable of the middle_section
function should be the same variable of the initial_task
function).
问题是我无法访问 SubDagOperator
的subdag函数中的 xcom
,因为我没有任何上下文。另外,由于调度程序的自动发现DAG功能,我无法到达任何数据库以读取某些值: middle_section
每隔几秒钟执行一次。
The thing is that I can't access xcom
inside the subdag function of a SubDagOperator
because I haven't any context. Also, I can't reach to any DB for reading some value because of the autodiscovery DAG feature of the scheduler: the middle_section
is executed every few seconds.
你们如何解决这个问题?根据先前任务的结果在SubDAG中创建动态数量的任务?
How do you guys solve this? Create a dynamic number of tasks inside a SubDAG depending on the result of a previous task?
这是我正在开发的代码:
Here is the code I'm developing:
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
def initial_task(**context):
subtask_ids = [0, 1, 2]
task_instance = context['ti']
task_instance.xcom_push(key='depot_ids', value=subtask_ids)
def middle_section_task(subtask_id):
print(subtask_id)
def middle_section(parent_dag, arg):
subdag = DAG(dag_id=f'{dag.dag_id}.middle',
default_args=args, schedule_interval='@once')
subtask_ids = '' # Read from xcom
for subtask_id in subtask_ids:
PythonOperator(task_id=f'{dag.dag_id}.middle_section_task_{subtask_id}',
python_callable=middle_section_task,
op_kwargs={'subtask_id': subtask_id}, dag=subdag)
return subdag
def end_task(**context):
print('Finished')
dag = DAG(dag_id='stackoverflow', default_args=args, schedule_interval=None)
initial = PythonOperator(task_id='start_task', python_callable=initial_task,
provide_context=True, dag=dag)
middle = SubDagOperator(task_id='middle', subdag=middle_section(dag, args),
default_args=args, dag=dag)
end = PythonOperator(task_id='end_task', python_callable=end_task,
provide_context=True, dag=dag)
initial >> middle >> end
推荐答案
我遇到了同样的问题,我无法因为我认为气流任务和子任务的数量是在DAG验证时定义的,因此可以气流方式正确解决100%的问题。而且在验证时没有任何任务在运行,因此气流无法预先知道要计划多少subdag.task。
I had the same issue, I couldn't properly solve 100% the problem in an "Airflow way" since I think that the number of airflow tasks and subtasks is defined in the moment of the DAG validation. And at the validation no task is run, therefore there is no way that airflow knows beforehand how many subdag.tasks will be scheduled.
我规避此问题的方式可能不会是最好的(我愿意接受建议),但是它可以工作:
The way I circumvented this issue might not be the best (I'm open to suggestions) but it works:
main_dag.py
main_dag.py
# imports omitted for brevity
def get_info_from_db():
# get info from db or somewhere else, this info will define the number of subdag tasks to run
return urls, names
dag = DAG(...)
urls, names = get_info_from_db()
# You may ignore the dummy operators
start = DummyOperator(task_id='start', default_args=args, dag=dag)
sub_section = SubDagOperator(
task_id='import-file',
subdag=imported_subdag(DAG_NAME, 'subdag-name', args, urls=urls, file_names=names),
default_args=args,
dag=dag,
)
end = DummyOperator(task_id='end', default_args=args, dag=dag)
start.set_downstream(sub_section)
section_1.set_downstream(end)
然后最后我得到了subdag.py(确保它可以从气流中发现),以防它在单独的文件中
Then finally I have my subdag.py (Make sure it is discoverable from airflow) in case it is in a separate file
# imports omitted for brevity
def fetch_files(file_url, file_name):
# get file and save it to disk
return file_location
# this is how I get info returned from the previous task: fetch_files
def validate_file(task_id, **kwargs):
ti = kwargs['ti']
task = 'fetch_file-{}'.format(task_id)
file_location = ti.xcom_pull(task_ids=task)
def imported_subdag(parent_dag_name, child_dag_name, args, urls, file_names):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval="@daily",
)
for i in range(len(urls)):
# the task name should also be dynamic in order not to have duplicates
validate_file_operator = PythonOperator(task_id='validate_file-{}'.format(i+1),
python_callable=validate_file,
provide_context=True, dag=dag_subdag, op_kwargs={'task_id': i + 1})
fetch_operator = PythonOperator(task_id='fetch_file-{}'.format(i+1),
python_callable=fetch_zip, dag=dag_subdag,
op_kwargs={'file_url': urls[i], 'file_name': file_names[i]})
fetch_operator.set_downstream(validate_file_operator)
return dag_subdag
基本上我的逻辑是,在通过气流验证的时刻, get_info_from_db()
被执行,并且所有dag和subdags都已正确安排动态地。如果我从数据库中添加或删除内容,将在下一次dag验证中更新要运行的任务数。
Basically my logic is that in the moment of the validation by Airflow get_info_from_db()
gets executed and all dags and subdags are properly scheduled dynamically. If I add or remove content from the db, the number of tasks to be run will be updated in the next dag validation.
这种方法适合我的用例,但是我希望将来Airflow能够在本机上支持此功能(动态任务数/subdag.tasks)。
This approach suited my use case, but I hope in the future Airflow supports this feature (dynamic number of tasks/subdag.tasks) natively.
这篇关于如何根据上一个任务的结果在SubDAG中真正创建n个任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!