手动DAG运行设置单个任务状态 [英] Manual DAG run set individual task state

查看:249
本文介绍了手动DAG运行设置单个任务状态的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个没有时间表的DAG(根据需要手动运行).它有很多任务.有时我想通过手动将任务状态更改为SUCCESS来跳过"一些初始任务.更改手动执行的DAG的任务状态失败,这似乎是由于解析了execute_date的错误.

I have a DAG without a schedule (it is run manually as needed). It has many tasks. Sometimes I want to 'skip' some initial tasks by changing the task state to SUCCESS manually. Changing task state of a manually executed DAG fails, seemingly because of a bug in parsing the execution_date.

还有另一种方法可以单独设置手动执行的DAG的任务状态吗?

Is there another way to individually setting task states for a manually executed DAG?

示例在下面运行.该任务的执行日期为01-13T17:27:13.130427,我相信毫秒数未正确解析.

Example run below. The execution date of the Task is 01-13T17:27:13.130427, and I believe the milliseconds are not being parsed correctly.

回溯(最近通话最近): 文件"/opt/conda/envs/jumpman_prod/lib/python3.6/site-packages/airflow/www/views.py",行2372,位于set_task_instance_state execution_date = datetime.strptime(execution_date,'%Y-%m-%d%H:%M:%S') _strptime_datetime中的第565行"/opt/conda/envs/jumpman_prod/lib/python3.6/_strptime.py"文件 tt,分数= _strptime(数据字符串,格式) _strptime中的第365行的文件"/opt/conda/envs/jumpman_prod/lib/python3.6/_strptime.py" data_string [found.end():]) ValueError:仍保留未转换的数据:.. 130427

Traceback (most recent call last): File "/opt/conda/envs/jumpman_prod/lib/python3.6/site-packages/airflow/www/views.py", line 2372, in set_task_instance_state execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S') File "/opt/conda/envs/jumpman_prod/lib/python3.6/_strptime.py", line 565, in _strptime_datetime tt, fraction = _strptime(data_string, format) File "/opt/conda/envs/jumpman_prod/lib/python3.6/_strptime.py", line 365, in _strptime data_string[found.end():]) ValueError: unconverted data remains: ..130427

推荐答案

您可能要使用分支来实现此目的,顾名思义,它可以让您遵循不同的执行方式根据某些条件的路径,就像任何编程语言中的if一样.

What you may want to do to accomplish this is using branching, which, as the name suggests, allows you to follow different execution paths according to some conditions, just like an if in any programming language.

您可以使用BranchPythonOperator(在此处记录)为了达到这个目标:这个运算符是由python_callable配置的,该函数输出task_id以便下一步执行(当然,它应该是直接位于BranchPythonOperator本身下游的任务) ).

You can use the BranchPythonOperator (documented here) to attain this goal: the idea is that this operator is configured by a python_callable, a function that outputs the task_id to execute next (which should, of course, be a task which is directly downstream from the BranchPythonOperator itself).

使用分支将自动将skipped任务设置为正确的状态,如文档中所述:

Using branching will set the skipped tasks to the proper state automatically, as mentioned in the documentation:

所有其他分支"或直接下游任务都标记为skipped状态,因此这些路径无法向前移动. skipped状态向下游传播,以允许填充DAG状态并推断DAG运行状态.

All other "branches" or directly downstream tasks are marked with a state of skipped so that these paths can’t move forward. The skipped states are propagated downstream to allow for the DAG state to fill up and the DAG run’s state to be inferred.

生成的DAG如下所示:

The resulting DAG would look something like the following:

在正式的Apache Airflow文档中,此处

Branching is documented here, on the official Apache Airflow documentation.

这篇关于手动DAG运行设置单个任务状态的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆