我们如何使用 TriggerDagRunOperator 触发多个气流 dags? [英] How do we trigger multiple airflow dags using TriggerDagRunOperator?

查看:124
本文介绍了我们如何使用 TriggerDagRunOperator 触发多个气流 dags?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个场景,其中一个特定的 dag 在完成时需要触发多个 dag,已使用 TriggerDagRunOperator 触发单个 dag,是否可以将多个 dag 传递给 TriggerDagRunOperator 以触发多个 dag?

I have a scenario wherein a particular dag upon completion needs to trigger multiple dags,have used TriggerDagRunOperator to trigger single dag,is it possible to pass multiple dags to the TriggerDagRunOperator to trigger multiple dags?

是否有可能只有在当前 dag 成功完成后才能触发.

And is it possible to trigger only upon successful completion of the current dag.

推荐答案

我遇到了同样的问题.并且没有开箱即用的解决方案,但我们可以为其编写自定义运算符.

I have faced the same problem. And there is no solution out of the box, but we can write a custom operator for it.

这里是自定义运算符的代码,它以 python_callabletrigger_dag_id 作为参数:

So here the code of a custom operator, that get python_callable and trigger_dag_id as arguments:

class TriggerMultiDagRunOperator(TriggerDagRunOperator):

    @apply_defaults
    def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs):
        super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}

    def execute(self, context):
        session = settings.Session()
        created = False
        for dro in self.python_callable(context, *self.op_args, **self.op_kwargs):
            if not dro or not isinstance(dro, DagRunOrder):
                break

            if dro.run_id is None:
                dro.run_id = 'trig__' + datetime.utcnow().isoformat()

            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                state=State.RUNNING,
                conf=dro.payload,
                external_trigger=True
            )
            created = True
            self.log.info("Creating DagRun %s", dr)

        if created is True:
            session.commit()
        else:
            self.log.info("No DagRun created")
        session.close()

trigger_dag_id 是我们想要多次运行的 dag id.

trigger_dag_id is dag id what we want running multiple times.

python_callable 是一个函数,它应该返回一个 DagRunOrder 对象的列表,一个对象用于调度一个带有 dag_id trigger_dag_id 的 DAG 实例.

python_callable is a function, it should return a list of DagRunOrder objects, one object for schedule one instance of DAG with dag_id trigger_dag_id.

GitHub 上的代码和示例:https://github.com/mastak/airflow_multi_dagrun关于此代码的更多说明:https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13

Code and examples on GitHub: https://github.com/mastak/airflow_multi_dagrun Little bit more description about this code: https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13

这篇关于我们如何使用 TriggerDagRunOperator 触发多个气流 dags?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆