气流获取重试编号 [英] Airflow Get Retry Number

查看:93
本文介绍了气流获取重试编号的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在Airflow DAG中,我需要执行一项任务,该任务需要了解它是第一次运行还是重试运行。如果尝试重试,我需要调整任务的逻辑。

In my Airflow DAG I have a task that needs to know if it's the first time it's ran or if it's a retry run. I need to adjust my logic in the task if it's a retry attempt.

我对如何存储任务重试次数有一些想法,但是我我不确定它们中的任何一个是否合法,或者是否有一种简便的方法可以在任务中获取此信息。

I have a few ideas on how I could store the number of retries for the task but I'm not sure if any of them are legitimate or if there's an easier built in way to get this information within the task.


  • 我想知道是否可以在每次运行任务时附加的dag中有一个整数变量。然后,如果该任务重新运行,我可以检查该变量的值以确保其大于1,因此可以重试运行。但是我不确定可变全局变量是否可以在Airflow中以这种方式工作,因为可以有多个工作人员来执行不同的任务(不过我不确定)。

  • I'm wondering if I can just have an integer variable inside the dag that I append every time the task runs. Then if the task if reran I could check the value of the variable to see that it's greater than 1 and hence would be a retry run. But I'm not sure if mutable global variables work that way in Airflow since there can be multiple workers for different tasks (I'm not sure on this though).

将其写入XCOM变量吗?

Write it in an XCOM variable?

推荐答案

重试编号可从任务实例获得,可通过宏 {{task_instance}} 来获得。 https://airflow.apache.org/code.html#default-variables

The retry number is available from the task instance, which is available via the macro {{ task_instance }}. https://airflow.apache.org/code.html#default-variables

如果您使用的是python运算符,只需将 provide_context = True,添加到运算符kwargs中,然后在callable do kwargs ['task_instance']。try_number

If you are using the python operator simply add provide_context=True, to your operator kwargs, and then in the callable do kwargs['task_instance'].try_number

否则,您可以执行以下操作:

Otherwise you can do something like:

t = BashOperator(
    task_id='try_number_test',
    bash_command='echo "{{ task_instance.try_number }}"',
    dag=dag)

编辑:

清除任务实例后,它将把max_retry号设置为当前try_number +重试值。因此,您可以执行以下操作:

When the task instance is cleared, it will set the max_retry number to be the current try_number + retry value. So you could do something like:

ti = # whatever method you do to get the task_instance object
is_first = ti.max_tries - ti.task.retries + 1 == ti.try_number

气流将使try_number增加运行时为1,因此我想从配置的重试值中减去max_tries时需要+ 1。但是我没有测试确认

Airflow will increments the try_number by 1 when running, so I imagine you'd need the + 1 when subtracting the max_tries from the configured retry value. But I didn't test that to confirm

这篇关于气流获取重试编号的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆