气流自定义 schedule_interval 设置 [英] Airflow custom schedule_interval setup
问题描述
有没有办法在 Airflow DAG 中设置/编写自定义 schedule_interval?我正在寻找的是一种在 DAG 除假期(如圣诞节、劳动节、独立日等)之外每天运行时设置时间表的方法
is there a way to set up/write a custom schedule_interval in an Airflow DAG? What I'm looking for is a way to set up a schedule when DAG runs on a daily basis except of holidays (like Christmas, Labor Day, Independence Day etc.)
使用标准的 cron 表达式是不可能实现的.非常感谢任何帮助/指南.
It is not possible to achieve with standard cron expressions. Any help/guide is much appreciated.
推荐答案
没有对这种类型的调度的本地支持,但您可以通过在工作流的开头添加 ShortCircuitOperator
来解决这个问题.这个操作符执行一个可调用的python.如果条件满足,则继续工作流,如果条件不满足,则将所有下游任务标记为已跳过.
There is no native support for this type of scheduling but you can solve this with adding ShortCircuitOperator
to the beginning of your workflow.
This operator execute a python callable. If condition met it continue workflow if condition doesn't met it mark all downstream tasks as skipped.
可能的解决方案是:
import holidays
def decide(**kwargs):
# Select country
us_holidays = holidays.US()
if str(kwargs['execution_date']) in us_holidays:
return False # Skip workflow if it's a holiday.
return True
dag = DAG(
dag_id='mydag',
schedule_interval='@daily',
default_args=default_args,
)
start_op = ShortCircuitOperator(
task_id='start_task',
python_callable=decide,
provide_context=True, # Remove this line if you are using Airflow>=2.0.0
dag=dag
)
#Replace this with your actual Operator in your workflow.
next_op = Operator(
task_id='next_task',
dag=dag
)
start_op >> next_op
此解决方案基于检测美国假期中提供的答案,我没有测试它,但它应该工作.在任何情况下,您都可以将 decide
中的逻辑替换为任何检测日期是否为假日的方法.
This solution is based on the answer provided in Detecting a US Holiday I didn't test it but it should work. In any case you can replace the logic in decide
to any method that detects if a date is a holiday or not.
这篇关于气流自定义 schedule_interval 设置的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!