气流-处理DAG回调的正确方法 [英] Airflow - Proper way to handle DAGs callbacks

查看:175
本文介绍了气流-处理DAG回调的正确方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 DAG ,然后无论成功还是失败,我都希望它触发一种方法,该方法发布到Slack。

I have a DAG and then whenever it success or fails, I want it to trigger a method which posts to Slack.

我的 DAG参数如下:

default_args = {
    [...]
    'on_failure_callback': slack.slack_message(sad_message),
    'on_success_callback': slack.slack_message(happy_message),
    [...]
}

DAG 定义本身:

dag = DAG(
    dag_id = dag_name_id,
    default_args=default_args,
    description='load data from mysql to S3',
    schedule_interval='*/10 * * * *',
    catchup=False
      )

但是,当我检查Slack时,每分钟有100条以上的消息,就好像在评估每个调度程序的心跳信号一样,对于每条日志它都运行成功和失败方法,就好像它在同一个任务实例上可以工作,也不起作用(不是很好)。

But when I check Slack there is more than 100 message each minute, as if is evaluating at each scheduler heartbeat and for every log it did runned the success and failure method as if it worked and didn't work for the same task instance (not fine).

我应该如何正确使用 on_failure_callback on_success_callback 处理dags状态并调用自定义方法?

How should I properly use the on_failure_callback and on_success_callback to handle dags statuses and call a custom method?

推荐答案

创建消息的原因是因为在定义 default_args ,您正在执行功能。您只需要传递函数定义而不执行它。

The reason it's creating the messages is because when you are defining your default_args, you are executing the functions. You need to just pass the function definition without executing it.

由于该函数具有自变量,因此会变得有些棘手。您可以定义两个部分函数或定义两个包装函数。

Since the function has an argument, it'll get a little trickier. You can either define two partial functions or define two wrapper functions.

因此,您可以执行以下操作:

So you can either do:

from functools import partial

success_msg = partial(slack.slack_message, happy_message);
failure_msg = partial(slack.slack_message, sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

def success_msg():
    slack.slack_message(happy_message);

def failure_msg():
    slack.slack_message(sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

在任一方法中,请注意函数定义如何 failure_msg success_msg 被传递,而不是它们执行时给出的结果。

In either method, note how just the function definition failure_msg and success_msg are being passed, not the result they give when executed.

这篇关于气流-处理DAG回调的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆