在Airflow中创建动态工作流程的正确方法 [英] Proper way to create dynamic workflows in Airflow
问题描述
Airflow中是否有任何方法可以创建工作流,使得任务B. *的数量在任务A完成之前是未知的?我看过subdags,但看起来它只能与必须由Dag创建确定的一组静态任务一起使用。
Is there any way in Airflow to create a workflow such that the number of tasks B.* is unknown until completion of Task A? I have looked at subdags but it looks like it can only work with a static set of tasks that have to be determined at Dag creation.
dag会触发工作吗?
Would dag triggers work? And if so could you please provide an example.
我遇到了一个问题,即在任务执行之前,不可能知道计算任务C所需的任务B的数量。 A已经完成。每个任务B. *将花费几个小时才能计算,并且无法合并。
I have an issue where it is impossible to know the number of task B's that will be needed to calculate Task C until Task A has been completed. Each Task B.* will take several hours to compute and cannot be combined.
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
想法#1
我不之所以喜欢这种解决方案,是因为我必须创建一个阻塞的ExternalTaskSensor,并且所有任务B. *将需要2-24小时才能完成。因此,我认为这不是可行的解决方案。当然有更简单的方法吗?还是不是为此设计了气流?
Idea #1
I don't like this solution because I have to create a blocking ExternalTaskSensor and all the Task B.* will take between 2-24 hours to complete. So I do not consider this a viable solution. Surely there is an easier way? Or was Airflow not designed for this?
Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C
Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
|-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
| .... |
|-- Task B.N --|
编辑1:
到目前为止,这个问题仍然没有很好的答案。我已经与寻求解决方案的几个人联系。
Edit 1:
As of now this question still does not have a great answer. I have been contacted by several people looking for a solution.
推荐答案
这是我在没有任何子标记的情况下对类似请求的处理方式:
Here is how I did it with a similar request without any subdags:
首先创建一个返回所需值的方法
First create a method that returns whatever values you want
def values_function():
return values
下一个将动态生成作业的创建方法:
Next create method that will generate the jobs dynamically:
def group(number, **kwargs):
#load the values if needed in the command you plan to execute
dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
return BashOperator(
task_id='JOB_NAME_{}'.format(number),
bash_command='script.sh {} {}'.format(dyn_value, number),
dag=dag)
然后将它们组合:
push_func = PythonOperator(
task_id='push_func',
provide_context=True,
python_callable=values_function,
dag=dag)
complete = DummyOperator(
task_id='All_jobs_completed',
dag=dag)
for i in values_function():
push_func >> group(i) >> complete
这篇关于在Airflow中创建动态工作流程的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!