我们如何使用 TriggerDagRunOperator 触发多个气流 dags? [英] How do we trigger multiple airflow dags using TriggerDagRunOperator?
问题描述
我有一个场景,其中一个特定的 dag 在完成时需要触发多个 dag,已使用 TriggerDagRunOperator 触发单个 dag,是否可以将多个 dag 传递给 TriggerDagRunOperator 以触发多个 dag?
I have a scenario wherein a particular dag upon completion needs to trigger multiple dags,have used TriggerDagRunOperator to trigger single dag,is it possible to pass multiple dags to the TriggerDagRunOperator to trigger multiple dags?
是否有可能只有在当前 dag 成功完成后才能触发.
And is it possible to trigger only upon successful completion of the current dag.
推荐答案
我遇到了同样的问题.并且没有开箱即用的解决方案,但我们可以为其编写自定义运算符.
I have faced the same problem. And there is no solution out of the box, but we can write a custom operator for it.
这里是自定义运算符的代码,它以 python_callable
和 trigger_dag_id
作为参数:
So here the code of a custom operator, that get python_callable
and trigger_dag_id
as arguments:
class TriggerMultiDagRunOperator(TriggerDagRunOperator):
@apply_defaults
def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs):
super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}
def execute(self, context):
session = settings.Session()
created = False
for dro in self.python_callable(context, *self.op_args, **self.op_kwargs):
if not dro or not isinstance(dro, DagRunOrder):
break
if dro.run_id is None:
dro.run_id = 'trig__' + datetime.utcnow().isoformat()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
conf=dro.payload,
external_trigger=True
)
created = True
self.log.info("Creating DagRun %s", dr)
if created is True:
session.commit()
else:
self.log.info("No DagRun created")
session.close()
trigger_dag_id
是我们想要多次运行的 dag id.
trigger_dag_id
is dag id what we want running multiple times.
python_callable
是一个函数,它应该返回一个 DagRunOrder
对象的列表,一个对象用于调度一个带有 dag_id trigger_dag_id
的 DAG 实例.
python_callable
is a function, it should return a list of DagRunOrder
objects, one object for schedule one instance of DAG with dag_id trigger_dag_id
.
GitHub 上的代码和示例:https://github.com/mastak/airflow_multi_dagrun关于此代码的更多说明:https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13
Code and examples on GitHub: https://github.com/mastak/airflow_multi_dagrun Little bit more description about this code: https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13
这篇关于我们如何使用 TriggerDagRunOperator 触发多个气流 dags?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!