气流“ none_failed”上游跳过时跳过 [英] Airflow "none_failed" skipping when upstream skips

查看:90
本文介绍了气流“ none_failed”上游跳过时跳过的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个工作流,其中有两个并行进程( sentinel_run sentinel_skip ),应运行或跳过然后根据条件连接在一起(解决)。我需要直接在 sentinel _ 任务下游的任务进行级联跳过,但是当它进入 resolve 任务时, resolve 应该运行,除非上游两个进程中都出现故障。



基于



问题在于,应执行 resolved 任务(因为上游没有任何东西是 upstream_failed failed ),而是跳过了。



我已经对数据库进行了自省,并且没有隐藏任何失败或上游失败的任务,并且我无法弄清楚为什么它不遵守 none_failed逻辑。



我知道


I have a workflow where I have two parallel processes (sentinel_run and sentinel_skip) which should run or be skipped based on a condition, and then join together (resolve). I need tasks directly downstream of either sentinel_ task to have cascaded skipping, but when it gets to the resolve task, resolve should run unless there are failures in either process upstream.

Based on the documentation, the "none_failed" trigger rule should work:

none_failed: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped

and it's also an answer to a related question.

However, when I implemented a trivial example, that's not what I'm seeing:

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.utils.dates import days_ago

dag = DAG(
    "testing",
    catchup=False,
    schedule_interval="30 12 * * *",
    default_args={
        "owner": "test@gmail.com",
        "start_date": days_ago(1),
        "catchup": False,
        "retries": 0
    }
)

start = DummyOperator(task_id="start", dag=dag)

sentinel_run = ShortCircuitOperator(task_id="sentinel_run", dag=dag, python_callable=lambda: True)
sentinel_skip = ShortCircuitOperator(task_id="sentinel_skip", dag=dag, python_callable=lambda: False)

a = DummyOperator(task_id="a", dag=dag)
b = DummyOperator(task_id="b", dag=dag)
c = DummyOperator(task_id="c", dag=dag)
d = DummyOperator(task_id="d", dag=dag)
e = DummyOperator(task_id="e", dag=dag)
f = DummyOperator(task_id="f", dag=dag)
g = DummyOperator(task_id="g", dag=dag)

resolve = DummyOperator(task_id="resolve", dag=dag, trigger_rule="none_failed")

start >> sentinel_run >> a >> b >> c >> resolve
start >> sentinel_skip >> d >> e >> f >> resolve

resolve >> g

This code creates the following dag:

The issue is that the resolved task should execute (because nothing upstream is either upstream_failed or failed), but it's skipping instead.

I've introspected the database, and there aren't any failed or upstream failed tasks hiding, and I can't figure out why it wouldn't honor the "none_failed" logic.

I know about the "ugly workaround" and have implemented it in other workflows, but it adds another task to execute, and increases the complexity that new users to the DAG have to grok (especially when you multiply this by multiple tasks...). This was my primary reason for upgrading from Airflow 1.8 to Airflow 1.10, so I'm hoping there's just something obvious I'm missing...

解决方案

Documenting this because this issue has bitten me twice and now I've solved it twice.

Problem Analysis

When you turn the log level to DEBUG, you start to see what's going on:

[2019-10-09 18:30:05,472] {python_operator.py:114} INFO - Done. Returned value was: False
[2019-10-09 18:30:05,472] {python_operator.py:159} INFO - Condition result is False
[2019-10-09 18:30:05,472] {python_operator.py:165} INFO - Skipping downstream tasks...
[2019-10-09 18:30:05,472] {python_operator.py:168} DEBUG - Downstream task_ids [<Task(DummyOperator): f>, <Task(DummyOperator): g>, <Task(DummyOperator): d>, <Task(DummyOperator): resolve>, <Task(DummyOperator): e>]
[2019-10-09 18:30:05,492] {python_operator.py:173} INFO - Done.

From this, you can see that the problem isn't that "none_failed" is handling the tasks incorrectly, but rather that the sentinel simulating the skip condition is marking all downstream dependencies skipped directly. This is a behavior of the ShortCircuitOperator - skipping all of the downstreams, including tasks downstream of downstream tasks.

Solution

The solution to this problem lies in recognizing that it's the behavior of the ShortCircuitOperator, not the TriggerRule, which is causing the problem. Once we realize that, it's time to set about writing an operator better suited to the task we're actually trying to accomplish.

I've included the operator I'm currently using; I'd welcome any input on a better way to handle the modification of the single downstream tasks. I'm sure there's a better idiom for "skip just the next one and let the rest cascade according to their trigger rules", but I've already spent more time than I wanted on this and I suspect the answer lies even deeper in the internals.

"""Sentinel Operator Plugin"""

import datetime

from airflow import settings
from airflow.models import SkipMixin, TaskInstance
from airflow.operators.python_operator import PythonOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.state import State


class SentinelOperator(PythonOperator, SkipMixin):
    """
    Allows a workflow to continue only if a condition is met. Otherwise, the
    workflow skips cascading downstream to the next time a viable task
    is identified.

    The SentinelOperator is derived from the PythonOperator. It evaluates a
    condition and stops the workflow if the condition is False. Immediate
    downstream tasks are skipped. If the condition is True, downstream tasks
    proceed as normal.

    The condition is determined by the result of `python_callable`.
    """
    def execute(self, context):
        condition = super(SentinelOperator, self).execute(context)
        self.log.info("Condition result is %s", condition)

        if condition:
            self.log.info('Proceeding with downstream tasks...')
            return

        self.log.info('Skipping downstream tasks...')

        session = settings.Session()

        for task in context['task'].downstream_list:
            ti = TaskInstance(task, execution_date=context['ti'].execution_date)
            self.log.info('Skipping task: %s', ti.task_id)
            ti.state = State.SKIPPED
            ti.start_date = datetime.datetime.now()
            ti.end_date = datetime.datetime.now()
            session.merge(ti)

        session.commit()
        session.close()

        self.log.info("Done.")


class Plugin_SentinelOperator(AirflowPlugin):
    name = "sentinel_operator"
    operators = [SentinelOperator]

With the modifications, this then produces the intended dag results:

这篇关于气流“ none_failed”上游跳过时跳过的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆