在Apache Airflow DAG中使用AWS SES发送失败​​电子邮件 [英] Email on failure using AWS SES in Apache Airflow DAG

查看:143
本文介绍了在Apache Airflow DAG中使用AWS SES发送失败​​电子邮件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

每当DAG中的任务无法运行或重试运行时,我都希望Airflow使用AWS SES向我发送电子邮件。我也使用我的AWS SES凭证,而不是一般的AWS凭证。

I am trying to have Airflow email me using AWS SES whenever a task in my DAG fails to run or retries to run. I am using my AWS SES credentials rather than my general AWS credentials too.

我当前的 airflow.cfg

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com 
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 25
smtp_mail_from = myemail@myjob.com

我的DAG中当前旨在故意失败并重试的任务:

Current task in my DAG that is designed to intentionally fail and retry:

testfaildag_library_install_jar_jdbc = PythonOperator(
    task_id='library_install_jar',
    retries=3,
    retry_delay=timedelta(seconds=15),
    python_callable=add_library_to_cluster,
    params={'_task_id': 'cluster_create', '_cluster_name': CLUSTER_NAME, '_library_path':s3000://fakepath.jar},
    dag=dag,
    email_on_failure=True,
    email_on_retry=True,
    email=’myname@myjob.com’,
    provide_context=True
)

当任务重试设定的数量时,所有设计工作次,最终失败,除非没有发送电子邮件。我也检查了上述任务中的日志,并且从未提及过smtp。

Everything works as designed as the task retries the set number of times and ultimately fails, except no emails are being sent. I have checked the logs in the task mentioned above too, and smtp is never mentioned.

我看过类似的问题此处,但唯一的解决方案不适用于我。此外,Airflow的文档(例如其示例此处)似乎不起作用

I've looked at the similar question here, but the only solution there did not work for me. Additionally, Airflow's documentation such as their example here does not seem to work for me either.

SES是否可以使用Airflow的email_on_failure和email_on_retry函数?

我目前正在考虑使用 on_failure_callback 函数来调用由AWS 此处会在失败时发送电子邮件,但这不是首选的方法。

What I am currently thinking of doing is using the on_failure_callback function to call a python script provided by AWS here to send an email on failure, but that is not the preferable route at this point.

谢谢,感谢您的帮助。

推荐答案

-更新6/8 SES正常运行

--updated 6/8 with working SES

这是我写的关于如何使它们全部运行的文章。该答案的底部有一个小结。

here's my write up on how we got it all working. There is a small summary at the bottom of this answer.

大点对:


  1. 我们决定不使用Amazon SES,而是使用sendmail 。现在,我们已经启动并正在运行SES。

  2. 是为 email_on_failure email_on_retry 功能。您可以执行 journalctl –u airflow-worker –f 来进行Dag运行期间的监视。在生产服务器上,使用新的smtp设置更改 airflow.cfg 后,无需重新启动airflow-worker,它应该会自动提取。

  1. We decided not to use Amazon SES, and rather use sendmail We now have SES up and working.
  2. It is the airflow worker that services the email_on_failure and email_on_retry features. You can do journalctl –u airflow-worker –f to monitor it during a Dag run. On your production server, you do NOT need to restart your airflow-worker after changing your airflow.cfg with new smtp settings - it should be automatically picked up. No need to worry about messing up currently running Dags.

这里是有关如何使用sendmail的技术文章:

Here is the technical write-up on how to use sendmail:

由于我们在本地主机上从ses更改为sendmail,因此必须在 airflow.cfg 中更改smtp设置。

Since we changed from ses to sendmail on localhost, we had to change our smtp settings in the airflow.cfg.

新配置为:

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = localhost
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
#smtp_user = not used
#smtp_password = not used
smtp_port = 25
smtp_mail_from =  myjob@mywork.com

此功能在生产和本地气流实例中均有效。

This works in both production and local airflow instances.

如果某些配置与上面的我的不一样,可能会收到一些常见错误:

Some common errors one might receive if their config is not like mine above:


  • socket .error:[Errno 111]连接被拒绝-您必须在 airflow.cfg smtp_host 行c $ c>到 localhost

  • smtplib.SMTPException:服务器不支持STARTTLS扩展。-您必须更改 smtp_starttls airflow.cfg False

  • socket.error: [Errno 111] Connection refused -- you must change your smtp_host line in airflow.cfg to localhost
  • smtplib.SMTPException: STARTTLS extension not supported by server. -- you must change your smtp_starttls in airflow.cfg to False

在我的本地测试中,我试图简单地强制气流显示尝试发送电子邮件时发生的情况的日志–我创建了一个伪造的dag,如下所示:

In my local testing, I tried to simply force airflow to show a log of what was going on when it tried to send an email – I created a fake dag as follows:

# Airflow imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator

# General imports
from datetime import datetime,timedelta

def throwerror():
    raise ValueError("Failure")

SPARK_V_2_2_1 = '3.5.x-scala2.11'

args = {
    'owner': ‘me’,
    'email': ['me@myjob'],
    'depends_on_past': False,
    'start_date': datetime(2018, 5,24),
    'end_date':datetime(2018,6,28)
}

dag = DAG(
    dag_id='testemaildag',
    default_args=args,
    catchup=False,
    schedule_interval="* 18 * * *"
    )

t1 = DummyOperator(
    task_id='extract_data',
    dag=dag
)

t2 = PythonOperator(
    task_id='fail_task',
    dag=dag,
    python_callable=throwerror
)

t2.set_upstream(t1)

如果执行 journalctl -u airflow-worker -f ,您可以看到该工作人员说它已向DAG中的电子邮件发送了有关失败的警报电子邮件,但我们仍未收到该电子邮件。然后,我们决定通过执行 cat / var / log / maillog 来查看sendmail的邮件日志。我们看到了这样的日志:

If you do the journalctl -u airflow-worker -f, you can see that the worker says that it has sent an alert email on the failure to the email in your DAG, but we were still not receiving the email. We then decided to look into the mail logs of sendmail by doing cat /var/log/maillog. We saw a log like this:

Jun  5 14:10:25 production-server-ip-range postfix/smtpd[port]: connect from localhost[127.0.0.1]
Jun  5 14:10:25 production-server-ip-range postfix/smtpd[port]: ID: client=localhost[127.0.0.1]
Jun  5 14:10:25 production-server-ip-range postfix/cleanup[port]: ID: message-id=<randomMessageID@production-server-ip-range-ec2-instance>
Jun  5 14:10:25 production-server-ip-range postfix/smtpd[port]: disconnect from localhost[127.0.0.1]
Jun  5 14:10:25 production-server-ip-range postfix/qmgr[port]: MESSAGEID: from=<myjob@mycompany.com>, size=1297, nrcpt=1 (queue active)
Jun  5 14:10:55 production-server-ip-range postfix/smtp[port]: connect to aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
Jun  5 14:11:25 production-server-ip-range postfix/smtp[port]: connect to alt1.aspmx.l.google.com[smtp-ip-range]:25: Connection timed out

所以这可能是最大的噢时刻。在这里,我们可以看到smtp服务中实际发生的情况。我们使用telnet确认无法连接到gmail的目标IP范围。

So this is probably the biggest "Oh duh" moment. Here we are able to see what is actually going on in our smtp service. We used telnet to confirm that we were not able to connect to the targeted IP ranges from gmail.

我们确定电子邮件正在尝试发送,但是sendmail已发送服务无法成功连接到IP范围。

We determined that the email was attempting to be sent, but that the sendmail service was unable to connect to the ip ranges successfully.

我们决定允许AWS上端口25上的所有出站流量(因为我们的气流生产环境是ec2实例),现在它可以成功运行。现在,我们可以接收有关失败和重试的电子邮件(提示: email_on_failure email_on_retry 默认为<$ c $ DAG中的c> True API参考-如果您不想这样做,则无需将其放入args中,但在其中明确声明True或False仍然是一种好习惯。)

We decided to allow all outbound traffic on port 25 in AWS (as our airflow production environment is an ec2 instance), and it now works successfully. We are now able to receive emails on failures and retries (tip: email_on_failure and email_on_retry are defaulted as True in your DAG API Reference - you do not need to put it into your args if you do not want to, but it is still good practice to explicitly state True or False in it).

SES现在可以使用。这是气流配置:

SES now works. Here is the airflow config:

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com 
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 587
smtp_mail_from = myemail@myjob.com (Verified SES email)

谢谢!

这篇关于在Apache Airflow DAG中使用AWS SES发送失败​​电子邮件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆