BranchPythonOperator之后的气流任务不会失败,并且可以正确成功 [英] Airflow task after BranchPythonOperator does not fail and succeed correctly

查看:239
本文介绍了BranchPythonOperator之后的气流任务不会失败,并且可以正确成功的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的DAG中,我有一些任务只能在星期六执行。因此,我使用BranchPythonOperator在星期六的任务和DummyTask之间进行分支。之后,我加入了两个分支,并希望执行其他任务。



工作流程如下:


BranchPythonOperator和分支正确具有状态'upstream_failed',但是加入分支的任务变为'skipped',因此整个工作流程显示为 'success'



我尝试使用'all_success'作为触发规则,那么如果有东西直到整个工作流程都失败了,但如果没有失败,则将跳过dummy3。



我还尝试了'all_done'作为触发规则,那么如果没有任何失败,它将正常工作,但是如果失败,则dummy3仍将执行。



我的测试代码如下:

  from datetime导入datetime,日期
从airflow导入DAG
从airflow.operators.python_operator import BranchPythonOperator,PythonOperator
从airflow .operators.dummy_operator import DummyOperator

dag = DAG('test_branches',
description ='测试分支',
catchup = False,
schedule_interval ='0 0 * * *',
start_date = datetime(2018,8,1))


def python1():
引发Exception('Test failure')
#打印'测试成功'


dummy1 = PythonOperator(
task_id ='python1',
python_callable = python1,
dag = dag



虚拟对象2 = Du mmyOperator(
task_id ='dummy2',
dag = dag



dummy3 = DummyOperator(
task_id ='dummy3',
dag = dag,
trigger_rule ='one_success'



def is_saturday():
如果date.today()。 weekday()== 6:
返回'dummy2'
其他:
返回'today_is_not_saturday'


branch_on_saturday = BranchPythonOperator(
task_id ='branch_on_saturday',
python_callable = is_saturday,
dag = dag)


not_saturday = DummyOperator(
task_id ='today_is_not_saturday',
dag = dag


dummy1> branch_on_saturday>>假人2>> dummy3
branch_on_saturday>> not_saturday>> dummy3



EDIT



我刚刚想通了一个丑陋的解决方法:


dummy4代表我实际上需要运行的任务,dummy5只是一个假人。

dummy3仍然具有触发规则'one_success'



现在,如果没有上游故障,则将运行dummy3和dummy4

如果上游不是星期天,则dummy5'运行',如果星期六不是星期六,dummy5将被跳过,这意味着DAG在两种情况下都标记为成功。跳过,dummy5被标记为'upstream_failed',并且DAG被标记为失败。



此解决方法使我的DAG

解决方案

设置触发器ru如果将dummy3设为'none_failed',在任何情况下都会使其以预期状态结束。



请参见 https://airflow.apache.org/concepts.html#trigger-rules






编辑:看起来像这样'none_failed'触发规则尚不存在:它是在2018年11月添加的



请参见 https://github.com/apache/airflow/pull/4182


In my DAG, I have some tasks that should only be run on Saturdays. Therefore I used a BranchPythonOperator to branch between the tasks for Saturdays and a DummyTask. After that, I join both branches and want to run other tasks.

The workflow looks like this:
Here I set the trigger rule for dummy3 to 'one_success' and everything works fine.

The problem I encountered is when something upstream of the BranchPythonOperator fails:
The BranchPythonOperator and the branches correctly have the state'upstream_failed', but the task joining the branches becomes 'skipped', therefore the whole workflow shows 'success'.

I tried using 'all_success' as the trigger rule, then it works correctly if something fails the whole workflow fails, but if nothing fails dummy3 gets skipped.

I also tried 'all_done' as the trigger rule, then it works correctly if nothing fails, but if something fails dummy3 still gets executed.

My test code looks like this:

from datetime import datetime, date
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
from airflow.operators.dummy_operator import DummyOperator

dag = DAG('test_branches',
          description='Test branches',
          catchup=False,
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 8, 1))


def python1():
    raise Exception('Test failure')
    # print 'Test success'


dummy1 = PythonOperator(
    task_id='python1',
    python_callable=python1,
    dag=dag
)


dummy2 = DummyOperator(
    task_id='dummy2',
    dag=dag
)


dummy3 = DummyOperator(
    task_id='dummy3',
    dag=dag,
    trigger_rule='one_success'
)


def is_saturday():
    if date.today().weekday() == 6:
        return 'dummy2'
    else:
        return 'today_is_not_saturday'


branch_on_saturday = BranchPythonOperator(
    task_id='branch_on_saturday',
    python_callable=is_saturday,
    dag=dag)


not_saturday = DummyOperator(
    task_id='today_is_not_saturday',
    dag=dag
)

dummy1 >> branch_on_saturday >> dummy2 >> dummy3
branch_on_saturday >> not_saturday >> dummy3

EDIT

I just figured out an ugly workaround:
dummy4 represents a task that I actually need to run, dummy5 is just a dummy.
dummy3 still has the trigger rule 'one_success'.

Now dummy3 and dummy4 run if there is no upstream failure, dummy5 'runs' if the day is not saturday and gets skipped if the day is saturday, which means the DAG is marked as success in both cases.
If there is a failure upstream, dummy3 and dummy4 get skipped and dummy5 gets marked as 'upstream_failed' and the DAG is marked as failed.

This workaround makes my DAG run as I want it to, but I'd still prefer a solution without some hacky workaround.

解决方案

Setting the trigger rule for dummy3 to 'none_failed' would make it end with the expected status in any cases.

see https://airflow.apache.org/concepts.html#trigger-rules


EDIT : it looks like this 'none_failed' trigger rule was not yet existing when this question was asked and answered: it was added in november 2018

see https://github.com/apache/airflow/pull/4182

这篇关于BranchPythonOperator之后的气流任务不会失败,并且可以正确成功的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆