气流默认on_failure_callback [英] Airflow default on_failure_callback
本文介绍了气流默认on_failure_callback的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
在我的DAG文件中,我定义了一个on_failure_callback()函数以在失败时发布Slack.
In my DAG file, I have define a on_failure_callback() function to post a Slack in case of failure.
如果我在DAG中为每个运算符指定一个,它将很好地工作:on_failure_callback = on_failure_callback()
It works well if I specify for each operator in my DAG : on_failure_callback=on_failure_callback()
是否有一种方法可以自动(例如通过default_args或通过我的DAG对象)将调度分配给我所有的运算符?
Is there a way to automate (via default_args for instance, or via my DAG object) the dispatch to all of my operators?
推荐答案
我终于找到了一种方法.
I finally found a way to do that.
您可以将on_failure_callback作为default_args传递
You can pass your on_failure_callback as a default_args
class Foo:
@staticmethod
def get_default_args():
"""
Return default args
:return: default_args
"""
default_args = {
'on_failure_callback': Foo.on_failure_callback
}
return default_args
@staticmethod
def on_failure_callback(context):
"""
Define the callback to post on Slack if a failure is detected in the Workflow
:return: operator.execute
"""
operator = SlackAPIPostOperator(
task_id='failure',
text=str(context['task_instance']),
token=Variable.get("slack_access_token"),
channel=Variable.get("slack_channel")
)
return operator.execute(context=context)
这篇关于气流默认on_failure_callback的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文