气流TriggerDagRunOperator如何更改执行日期 [英] airflow TriggerDagRunOperator how to change the execution date

查看:372
本文介绍了气流TriggerDagRunOperator如何更改执行日期的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我注意到,对于计划任务,执行日期是根据

设置的。


Airflow是为满足ETL需求而开发的。在ETL世界中,通常
汇总数据。因此,如果我想汇总2016年2月19日
的数据,我会在格林尼治标准时间2016-02-20午夜进行,在2016-02-19年的所有数据变为


但是,当dag触发另一个dag时,执行时间设置为now()。



有没有办法使触发的dag具有与触发dag相同的执行时间?当然,我可以重写模板并使用昨天的ds,但这是一个棘手的解决方案。

解决方案

下面的类在 TriggerDagRunOperator 允许将执行日期作为字符串传递,然后将其转换回日期时间。有点黑,但这是我发现完成工作的唯一方法。

 来自datetime导入datetime 
导入日志记录

从airflow导入设置
从airflow.utils.state import State
从airflow.models导入DagBag
从airflow。 operator.dagrun_operator import TriggerDagRunOperator,DagRunOrder

MMTTriggerDagRunOperator(TriggerDagRunOperator)类:

MMT已为传递显式执行日期
进行了修补(否则很难挂钩)
用于从控制器DAG显式设置目标DAG
的执行日期时使用。

改编自Paul Elliot的解决方案airflow-dev邮件列表档案:
http://mail-archives.apache.org/mod_mbox/airflow-dev/201711.mbox/%3cCAJuWvXgLfipPmMhkbf63puPGfi_ezj8vHYWoSHpBXysXhF_oZQ@mail.gmail.com%b$b b参数
------------------
execute_date:str
自定义执行日期(jinja'd)

用法示例:
-------------------
my_dag_trigger_operator = MMTTriggerDagRunOperator(
execution_date = {{execution_date}}
task_id ='my_dag_trigger_operator',
trigger_dag_id ='my_target_dag_id',
python .getrandbits(1),
params = {},
dag = my_controller_dag


template_fields =('execution_date',)

def __init __(
self,trigger_dag_id,python_callable,execute_date,
* args,** kwargs
):
self.execution_date =执行日期
super( MMTTriggerDagRunOperator,self).__ init __(
trigger_dag_id = trigger_dag_id,python_callable = python_callable,
* args,** kwargs


def execute(self,context):
run_id_dt = datetime.strptime(self。 execution_date,'%Y-%m-%d%H:%M:%S')
dro = DagRunOrder(run_id ='trig__'+ run_id_dt.isoformat())
dro = self.python_callable (上下文,dro)
如果dro:
session = settings.Session()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id = dro.run_id,
state = State.RUNNING,
execution_date = self.execution_date,
conf = dro.payload,
external_trigger = True)
logging.info(正在创建DagRun {}。format(dr))
session.add(dr)
session.commit()
session.close()
其他:
logging.info(不符合条件,继续前进)

使用此功能而不设置 execution_date = now()时可能会遇到问题:如果您尝试使用相同的 execution_date 两次启动dag,则ur运算符将引发mysql错误。这是因为 execution_date dag_id 用于创建行索引,并且不能插入具有相同索引的行。 / p>

我想不出您为什么要在生产中使用相同的 execution_date 运行两个相同的dag的原因无论如何,但这是我在测试时遇到的问题,您不应对此感到震惊。只需清除旧工作或使用其他日期时间即可。


I noticed that for scheduled task the execution date is set in the past according to

Airflow was developed as a solution for ETL needs. In the ETL world, you typically summarize data. So, if I want to summarize data for 2016-02-19, I would do it at 2016-02-20 midnight GMT, which would be right after all data for 2016-02-19 becomes available.

however, when a dag triggers another dag the execution time is set to now().

Is there a way to have the triggered dags with the same execution time of triggering dag? Of course, I can rewrite the template and use yesterday_ds, however, this is a tricky solution.

解决方案

The following class expands on TriggerDagRunOperator to allow passing the execution date as a string that then gets converted back into a datetime. It's a bit hacky but it is the only way I found to get the job done.

from datetime import datetime
import logging

from airflow import settings
from airflow.utils.state import State
from airflow.models import DagBag
from airflow.operators.dagrun_operator import TriggerDagRunOperator, DagRunOrder

class MMTTriggerDagRunOperator(TriggerDagRunOperator):
    """
    MMT-patched for passing explicit execution date
    (otherwise it's hard to hook the datetime.now() date).
    Use when you want to explicity set the execution date on the target DAG
    from the controller DAG.

    Adapted from Paul Elliot's solution on airflow-dev mailing list archives:
    http://mail-archives.apache.org/mod_mbox/airflow-dev/201711.mbox/%3cCAJuWvXgLfipPmMhkbf63puPGfi_ezj8vHYWoSHpBXysXhF_oZQ@mail.gmail.com%3e

    Parameters
    ------------------
    execution_date: str
        the custom execution date (jinja'd)

    Usage Example:
    -------------------
    my_dag_trigger_operator = MMTTriggerDagRunOperator(
        execution_date="{{execution_date}}"
        task_id='my_dag_trigger_operator',
        trigger_dag_id='my_target_dag_id',
        python_callable=lambda: random.getrandbits(1),
        params={},
        dag=my_controller_dag
    )
    """
    template_fields = ('execution_date',)

    def __init__(
        self, trigger_dag_id, python_callable, execution_date,
        *args, **kwargs
        ):
        self.execution_date = execution_date
        super(MMTTriggerDagRunOperator, self).__init__(
            trigger_dag_id=trigger_dag_id, python_callable=python_callable,
           *args, **kwargs
       )

    def execute(self, context):
        run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
        dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
        dro = self.python_callable(context, dro)
        if dro:
            session = settings.Session()
            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                state=State.RUNNING,
                execution_date=self.execution_date,
                conf=dro.payload,
                external_trigger=True)
            logging.info("Creating DagRun {}".format(dr))
            session.add(dr)
            session.commit()
            session.close()
        else:
            logging.info("Criteria not met, moving on")

There is an issue you may run into when using this and not setting execution_date=now(): your operator will throw a mysql error if you try to start a dag with an identical execution_date twice. This is because the execution_date and dag_id are used to create the row index and rows with identical indexes cannot be inserted.

I can't think of a reason you would ever want to run two identical dags with the same execution_date in production anyway, but it is something I ran into while testing and you should not be alarmed by it. Simply clear the old job or use a different datetime.

这篇关于气流TriggerDagRunOperator如何更改执行日期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆