气流-处理DAG回调的正确方法 [英] Airflow - Proper way to handle DAGs callbacks
问题描述
我有一个 DAG
,然后无论成功还是失败,我都希望它触发一种方法,该方法发布到Slack。
I have a DAG
and then whenever it success or fails, I want it to trigger a method which posts to Slack.
我的 DAG参数
如下:
default_args = {
[...]
'on_failure_callback': slack.slack_message(sad_message),
'on_success_callback': slack.slack_message(happy_message),
[...]
}
和 DAG
定义本身:
dag = DAG(
dag_id = dag_name_id,
default_args=default_args,
description='load data from mysql to S3',
schedule_interval='*/10 * * * *',
catchup=False
)
但是,当我检查Slack时,每分钟有100条以上的消息,就好像在评估每个调度程序的心跳信号一样,对于每条日志它都运行成功和失败方法,就好像它在同一个任务实例上可以工作,也不起作用(不是很好)。
But when I check Slack there is more than 100 message each minute, as if is evaluating at each scheduler heartbeat and for every log it did runned the success and failure method as if it worked and didn't work for the same task instance (not fine).
我应该如何正确使用 on_failure_callback
和 on_success_callback
处理dags状态并调用自定义方法?
How should I properly use the on_failure_callback
and on_success_callback
to handle dags statuses and call a custom method?
推荐答案
创建消息的原因是因为在定义 default_args $ c时$ c>,您正在执行功能。您只需要传递函数定义而不执行它。
The reason it's creating the messages is because when you are defining your default_args
, you are executing the functions. You need to just pass the function definition without executing it.
由于该函数具有自变量,因此会变得有些棘手。您可以定义两个部分函数或定义两个包装函数。
Since the function has an argument, it'll get a little trickier. You can either define two partial functions or define two wrapper functions.
因此,您可以执行以下操作:
So you can either do:
from functools import partial
success_msg = partial(slack.slack_message, happy_message);
failure_msg = partial(slack.slack_message, sad_message);
default_args = {
[...]
'on_failure_callback': failure_msg
'on_success_callback': success_msg
[...]
}
或
def success_msg():
slack.slack_message(happy_message);
def failure_msg():
slack.slack_message(sad_message);
default_args = {
[...]
'on_failure_callback': failure_msg
'on_success_callback': success_msg
[...]
}
在任一方法中,请注意函数定义如何 failure_msg
和 success_msg
被传递,而不是它们执行时给出的结果。
In either method, note how just the function definition failure_msg
and success_msg
are being passed, not the result they give when executed.
这篇关于气流-处理DAG回调的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!