气流 TriggerDagRunOperator 如何更改执行日期 [英] airflow TriggerDagRunOperator how to change the execution date

查看:19
本文介绍了气流 TriggerDagRunOperator 如何更改执行日期的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我注意到对于计划任务,执行日期根据

I noticed that for scheduled task the execution date is set in the past according to

Airflow 是为了满足 ETL 需求而开发的.在 ETL 世界中,您通常会汇总数据.所以,如果我想总结数据2016-02-19,我会在格林威治标准时间 2016-02-20 午夜做,这将是在 2016-02-19 的所有数据都可用之后.

Airflow was developed as a solution for ETL needs. In the ETL world, you typically summarize data. So, if I want to summarize data for 2016-02-19, I would do it at 2016-02-20 midnight GMT, which would be right after all data for 2016-02-19 becomes available.

然而,当一个 dag 触发另一个 dag 时,执行时间设置为 now().

however, when a dag triggers another dag the execution time is set to now().

有没有办法让触发 dag 的执行时间与触发 dag 的执行时间相同?当然,我也可以重写模板,使用oneday_ds,不过,这个方法比较麻烦.

Is there a way to have the triggered dags with the same execution time of triggering dag? Of course, I can rewrite the template and use yesterday_ds, however, this is a tricky solution.

推荐答案

以下类扩展了 TriggerDagRunOperator 以允许将执行日期作为字符串传递,然后再将其转换回日期时间.这有点老套,但这是我找到的完成工作的唯一方法.

The following class expands on TriggerDagRunOperator to allow passing the execution date as a string that then gets converted back into a datetime. It's a bit hacky but it is the only way I found to get the job done.

from datetime import datetime
import logging

from airflow import settings
from airflow.utils.state import State
from airflow.models import DagBag
from airflow.operators.dagrun_operator import TriggerDagRunOperator, DagRunOrder

class MMTTriggerDagRunOperator(TriggerDagRunOperator):
    """
    MMT-patched for passing explicit execution date
    (otherwise it's hard to hook the datetime.now() date).
    Use when you want to explicity set the execution date on the target DAG
    from the controller DAG.

    Adapted from Paul Elliot's solution on airflow-dev mailing list archives:
    http://mail-archives.apache.org/mod_mbox/airflow-dev/201711.mbox/%3cCAJuWvXgLfipPmMhkbf63puPGfi_ezj8vHYWoSHpBXysXhF_oZQ@mail.gmail.com%3e

    Parameters
    ------------------
    execution_date: str
        the custom execution date (jinja'd)

    Usage Example:
    -------------------
    my_dag_trigger_operator = MMTTriggerDagRunOperator(
        execution_date="{{execution_date}}"
        task_id='my_dag_trigger_operator',
        trigger_dag_id='my_target_dag_id',
        python_callable=lambda: random.getrandbits(1),
        params={},
        dag=my_controller_dag
    )
    """
    template_fields = ('execution_date',)

    def __init__(
        self, trigger_dag_id, python_callable, execution_date,
        *args, **kwargs
        ):
        self.execution_date = execution_date
        super(MMTTriggerDagRunOperator, self).__init__(
            trigger_dag_id=trigger_dag_id, python_callable=python_callable,
           *args, **kwargs
       )

    def execute(self, context):
        run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
        dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
        dro = self.python_callable(context, dro)
        if dro:
            session = settings.Session()
            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                state=State.RUNNING,
                execution_date=self.execution_date,
                conf=dro.payload,
                external_trigger=True)
            logging.info("Creating DagRun {}".format(dr))
            session.add(dr)
            session.commit()
            session.close()
        else:
            logging.info("Criteria not met, moving on")

在使用它而不设置 execution_date=now() 时可能会遇到一个问题:如果您尝试使用相同的 启动 dag,您的操作员将抛出一个 mysql 错误execution_date 两次.这是因为execution_datedag_id是用来创建行索引的,不能插入索引相同的行.

There is an issue you may run into when using this and not setting execution_date=now(): your operator will throw a mysql error if you try to start a dag with an identical execution_date twice. This is because the execution_date and dag_id are used to create the row index and rows with identical indexes cannot be inserted.

我想不出有什么理由让您无论如何都希望在生产中使用相同的 execution_date 运行两个相同的 dag,但这是我在测试时遇到的问题,您不必担心通过它.只需清除旧作业或使用不同的日期时间.

I can't think of a reason you would ever want to run two identical dags with the same execution_date in production anyway, but it is something I ran into while testing and you should not be alarmed by it. Simply clear the old job or use a different datetime.

这篇关于气流 TriggerDagRunOperator 如何更改执行日期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆