Apache Airflow任务卡在“ up_for_retry”状态 [英] Apache Airflow tasks are stuck in a 'up_for_retry' state

查看:1575
本文介绍了Apache Airflow任务卡在“ up_for_retry”状态的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在我们的系统上设置气流集群,并且以前一直在工作。我不确定如何更改此设置。

I've been setting up an airflow cluster on our system and previously it has been working. I'm not sure what I may have done to change this.

我有一个DAG,希望按计划运行。为了确保它能正常工作,我还想手动触发它。目前,这两个似乎都不起作用,并且似乎没有为任务实例编写日志。唯一可用的日志是通常看起来很健康的气流调度程序日志。

I have a DAG which I want to run on a schedule. To make sure it's working I'd also like to trigger it manually. Neither of these seem to be working at the moment and no logs seem to be being written for the task instances. The only logs available are the airflow scheduler logs which generally look healthy.

我经常遇到以下消息:

任务尚未准备好重试,但将自动重试。当前日期是2018-12-12T11:34:46.978355 + 00:00,任务将在2018-12-12T11:35:08.093313 + 00:00重试。

但是,如果我稍等一会儿,除了时间向前移了一点以外,再次显示了完全相同的消息。因此,似乎似乎从未真正尝试过执行任务。

However, if I wait a little the exact same message is presented again except the times have moved forward a little. Therefore, it seems the task is never actually being retried.

我正在使用LocalExecutor,并且该任务是SSHOperator。下面的简化代码。它所做的只是将ssh切换到另一台机器上,并使用预定的目录结构启动一堆应用程序。:

I am using a LocalExecutor and the task is an SSHOperator. Simplified code below. All it does is ssh onto another machine and start a bunch of application with a pre-determined directory structure.:

DB_INFO_FILE = 'info.json'
START_SCRIPT = '/bin/start.sh'
TIME_IN_PAST = timezone.convert_to_utc(datetime.today() - 
timedelta(days=1))

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': TIME_IN_PAST,
    'email': ['some_email@blah.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

def _extract_instance_id(instance_string):
    return re.findall(r'\d+', instance_string)[0]

def _read_file_as_json(file_name):
    with open(file_name) as open_file:
         return json.load(open_file)

DB_INFO = _read_file_as_json(DB_INFO_FILE)
CONFIG_CLIENT = ConfigDbClient(**DB_INFO)

APP_DIRS = CONFIG_CLIENT.get_values('%my-app-info%')

INSTANCE_START_SCRIPT_PATHS = {
    _extract_instance_id(instance_string): directory+START_SCRIPT
    for instance_string, directory in APP_DIRS.items()
    }

# Create an ssh hook which refers to pre-existing connection information
# setup and stored by airflow
SSH_HOOK = SSHHook(ssh_conn_id='my-conn-id')

# Create a DAG object to add tasks to
DAG = DAG('my-dag-id',
          default_args=DEFAULT_ARGS)

# Create a task for each app instance.
for instance_id, start_script in INSTANCE_START_SCRIPT_PATHS.items():
    task = SSHOperator(
        task_id='run-script-{0}'.format(instance_id),
        command='bash {0}'.format(start_script),
        ssh_hook=SSH_HOOK,
        dag=DAG)

当我通过命令行而不是通过UI单独运行任务时,它可以工作。看来我可以运行任务,但是我根本无法触发DAG运行。我也尝试过start_date s和间隔计划的许多组合,也只是为了健全性检查。

It works when I run the tasks individually, via the command line but not via the UI. It seems I can run tasks but I simply cannot trigger a DAG to run. I've tried many combinations of start_date s and interval schedules just to sanity check also.

有什么想法吗?

是的,我知道这个问题之前曾有人问过,我已经研究了所有问题,但没有解决方案对我有帮助。

And yes, I am aware this question has been asked before and I have looked at all of them but not of the solutions have helped me.

推荐答案

哦。您的开始日期的更改速度或速度比计划间隔时间短。

Oh. Your start_date is changing at the same rate or faster than the schedule interval period.

这是计划程序看到的内容每几秒钟:

Here's what the scheduler is seeing every couple of seconds:

start_date: 2018-12-11T12:12:12.000Z  # E.G. IFF now is 2018-12-12T12:12:12.000Z, a day ago
schedule_interval: timedelta(days=1)  # The default

以下是DAG运行的调度程序的要求:上一次运行发生的时间超过了一个调度间隔。如果没有发生预定的运行,则自开始日期预定的运行>,因为这是 execution_date 的最早允许日期。在这种情况下,应创建 dag_run ,将 execution_date 设置为该间隔时间的开始。然后,可以为DAG中满足依赖关系的任何任务创建 task_instance ,只要 task_instance execution_date 在DAG的开始日期之后(未存储在 dag_run 对象,但仅在检查dag的状态时通过加载DAG文件来重新计算。)

Here's what the scheduler needs for a DAG to run: The last time a run occurred was more than one schedule interval ago. If no scheduled run has occurred, the first scheduled run could start now if one full schedule interval has passed since the start_date as that is the earliest allowable date for execution_date. In which case the dag_run with the execution_date set to the start of that interval period should be created. Then task_instances can be created for any tasks in the DAG whose dependencies are met as long as the task_instance execution_date is after the start_date of the DAG (this is not stored on the dag_run object but recomputed by loading the DAG file just while inspecting the dag's state).

因此,由于开始日期保持不变,因此它不会自动排定间隔已满足。但是,如果是-2d,则至少要安排一次跑步,然后再进行其他跑步就必须等到安排之后的1d。不过,如果只在开始日期上设置一个固定的 datetime ,则会更容易。

So it's not getting scheduled automatically for the reason that the start date keeps changing just as the interval is satisfied. However if it were -2d at least one run would get scheduled and then any further runs would have to wait until it's 1d after that to be scheduled. It's easier though if you just set a fixed datetime on your start_date.

但是手动运行中那些奇怪的重试呢……

But what about those odd retries on your manual runs…

您确实开始了一两次手动运行。除非您另外指定,否则这些运行将当前时间作为 execution_date 。该日期应该在开始日期之后,至少直到明天,这应该清除它们的运行。但是,在您的日志中,您似乎看到它们失败了,并被标记为重试,也没有减少重试次数。我不确定为什么会这样,但是 SSHOperator 可能是不对的。

You did start a manual run or two. These runs took the current time as the execution_date unless you specified something else. This should be after the start_date, at least until tomorrow, which should clear them to run. But then it seems in your logs you're seeing that they're failing and being marked for retry and also not decrementing your retries. I'm not sure why that would be but could it be that something isn't right with the SSHOperator.

您是否在气流中加了 [ssh] 以便满足SSHOperator的依赖关系(特别是 paramiko sshtunnel )同时在网络服务器和调度程序上运行?其中之一正在工作,因为我认为它是根据添加到数据库中后在UI中进行解析和显示的。

Did you install airflow with the [ssh] extra so that SSHOperator's dependencies are met (specifically paramiko and sshtunnel) on both the webserver and scheduler? One of them is working because I assume it's parsing and showing up in the UI based on being added to the DB.

执行后会得到什么?

airflow test my-dag-id run-script-an_instance_id 2018-12-12T12:12:12

您知道调度程序和Web服务器正在循环填充其DAG包,因此每天重新运行此DAG文件几千次,然后重新加载json(它是本地访问权限,因此类似于导入模块),然后使用数据库查找重新创建 SSHHook 。我看不到任何设置此挂钩的幻想,为什么不从 SSHOperator 中删除​​ ssh_hook 并替换它使用 ssh_conn_id ='my-conn-id'可以在执行时创建一次吗?
我怀疑这是导致重试的问题,但是重试只是向前滚动。

You know that the scheduler and webserver are looping over refilling their DAG bag and so rerunning this DAG file a few 1000 times a day, reloading that json (it's local access, so similar to importing a module), and recreating that SSHHook with a DB lookup. I don't see anything fancy done setting up this hook, why not just remove ssh_hook from the SSHOperator and replace it with ssh_conn_id='my-conn-id' so it can be created once at execution time? I doubt that's the issue that's causing the retries that just roll forward though.

这篇关于Apache Airflow任务卡在“ up_for_retry”状态的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆