在特定日期的列表上执行Airflow DAG实例(任务) [英] Execute Airflow DAG instances (tasks) on a list of specific dates

查看:546
本文介绍了在特定日期的列表上执行Airflow DAG实例(任务)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用Apache airflow管理几个将来的发行版。
所有这些版本都是事先已知的,我需要确保不会忘记某些数据推送。

I would like to manage a couple of future releases using Apache airflow. All of these releases are known way in advance and I need to make sure some data pushing won't be forgotten.

问题在于,这些将来的发行版并没有遵循简单的定期时间表,而该时间表可以通过像 0 1 23 * * 或类似 @monthly 之类的东西。

The problem is that those future release do not follow a simple periodic schedule that could be handled with a classic cron like 0 1 23 * * or something like @monthly.

它是 2019-08-24 2019-09-30 2019-10-20 ...

还有另一种方法可以创建一个单独的所有这些未来发行版的 mydag.py 文件?这样做的标准/推荐方式是什么?我是否在以错误的方式思考(我想知道是因为文档和教程宁愿关注定期的事情)?

Is there another way but to create a seperate mydag.py file for all of those future releases? What is the standard / recommended way to do this? Am I thinking about this the wrong way (I wonder because the documentation and tutorials rather focus on the regular, periodic thing)?

推荐答案

您可以给DAG一个 @daily 时间表,然后开始它与 ShortCircuitOperator 检查执行日期是否与发布日期匹配的任务。如果是,则通过检查,DAG将运行。否则,它将跳过整个DAG,并且不会发生释放。请参阅 https:/中使用的该运算符的示例/github.com/apache/airflow/blob/1.10.3/airflow/example_dags/example_short_circuit_operator.py

You could give your DAG a @daily schedule, then start it with a ShortCircuitOperator task that checks to see if the execution date matches a release date. If it is, you pass the check and the DAG runs. Otherwise, it skips the entire DAG and no release happens. See an example of this operator being used in https://github.com/apache/airflow/blob/1.10.3/airflow/example_dags/example_short_circuit_operator.py.

我想它看起来像

RELEASE_DATES = ['2019-08-24', '2019-09-30', '2019-10-20']

dag = DAG(
    dag_id='my_dag',
    schedule_interval='@daily', 
    default_args=default_args,
)

def check_release_date(**context):
    # pass if it's a release day
    return context['ds'] in RELEASE_DATES

skip_if_not_release_date = ShortCircuitOperator(
    task_id='skip_if_not_release_date',
    python_callable=check_release_date,
    dag=dag,
    provide_context=True,
)

如果发布日期可以更改,那么您可能希望将此版本设为

If release dates can change, then you might want to make this a little more dynamic with variables to make updates easy.

def check_release_date(**context):
    release_dates = Variable.get('release_dates', deserialize_json=True)
    return context['ds'] in RELEASE_DATES

此外,如果出于任何原因您需要覆盖发行日期的硬编码列表,也可以将此任务标记为成功,以强制DAG运行。

Also if for whatever reason you need to override your hardcoded list of release dates, you can mark this task as success to force the DAG to run.

这篇关于在特定日期的列表上执行Airflow DAG实例(任务)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆