如何将参数传递给 Airflow on_success_callback 和 on_failure_callback [英] How to pass parameters to Airflow on_success_callback and on_failure_callback

查看:39
本文介绍了如何将参数传递给 Airflow on_success_callback 和 on_failure_callback的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经使用 on_success_callback 和 on_failure_callback 实现了关于成功和失败的电子邮件警报.

I have implemented email alerts on success and failure using on_success_callback and on_failure_callback.

根据气流文档

上下文字典作为单个参数传递给此函数.

a context dictionary is passed as a single parameter to this function.

如何向这些回调方法传递另一个参数?

How can I pass another parameter to these callback methods?

这是我的代码

from airflow.utils.email import send_email_smtp

def task_success_alert(context):
    subject = "[Airflow] DAG {0} - Task {1}: Success".format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1]
        )
    html_content = """
    DAG: {0}<br>
    Task: {1}<br>
    Succeeded on: {2}
    """.format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1], 
        datetime.now()
        )
    send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)

def task_failure_alert(context):
    subject = "[Airflow] DAG {0} - Task {1}: Failed".format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1]
        )
    html_content = """
    DAG: {0}<br>
    Task: {1}<br>
    Failed on: {2}
    """.format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1], 
        datetime.now()
        )
    send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 13),
    'on_success_callback': task_success_alert,
    'on_failure_callback': task_failure_alert
}

我打算将回调移动到另一个包并将电子邮件地址作为参数传递.

I intend to move the callbacks to another package and pass the email address as parameter.

推荐答案

您可以在 dag 中定义一个函数,该函数从您的包中调用该函数.在调用该函数时,将电子邮件作为参数传递.您可以在 DAG 级别进一步细化它以仅传递电子邮件所需的信息.

You could define a function inside your dag that calls the function from your package. And while calling that function, pass email as an argument. You can refine it further at your DAG level to pass only information required for the emails.

from package import outer_task_success_callback
email = 'xyz@example.com'

def task_success_alert(context):
    dag_id = context['dag'].dag_id
    task_id = context['task_instance']. task_id
    outer_task_success_callback(dag_id, task_id, email)
    
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 13),
    'on_success_callback': task_success_alert,
    'on_failure_callback': task_failure_alert
}

这将允许您在调用包中的函数之前进行自定义.

This will allow you to customize before you call the function in your package.

顺便说一句,airflow 具有 smtp 电子邮件功能.您可以利用这些解决方案,而不是编写自己的解决方案.

On a side note, airflow has smtp email functionality. Instead of writing your own solution, you can utilize those.

这篇关于如何将参数传递给 Airflow on_success_callback 和 on_failure_callback的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆