气流默认on_failure_callback [英] Airflow default on_failure_callback

查看:112
本文介绍了气流默认on_failure_callback的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的DAG文件中,我定义了一个on_failure_callback()函数以在失败时发布Slack.

In my DAG file, I have define a on_failure_callback() function to post a Slack in case of failure.

如果我在DAG中为每个运算符指定一个,它将很好地工作:on_failure_callback = on_failure_callback()

It works well if I specify for each operator in my DAG : on_failure_callback=on_failure_callback()

是否有一种方法可以自动(例如通过default_args或通过我的DAG对象)将调度分配给我所有的运算符?

Is there a way to automate (via default_args for instance, or via my DAG object) the dispatch to all of my operators?

推荐答案

我终于找到了一种方法.

I finally found a way to do that.

您可以将on_failure_callback作为default_args传递

You can pass your on_failure_callback as a default_args

class Foo:
  @staticmethod
  def get_default_args():
      """
      Return default args
      :return: default_args
      """

      default_args = {
          'on_failure_callback': Foo.on_failure_callback
      }

      return default_args

  @staticmethod
  def on_failure_callback(context):
     """
     Define the callback to post on Slack if a failure is detected in the Workflow
     :return: operator.execute
     """

     operator = SlackAPIPostOperator(
         task_id='failure',
         text=str(context['task_instance']),
         token=Variable.get("slack_access_token"),
         channel=Variable.get("slack_channel")
     )

     return operator.execute(context=context) 

这篇关于气流默认on_failure_callback的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆