Airflow - 将给定 dag_id 和 run_id 的特定 task_id 标记为成功或失败 [英] Airflow - mark a specific task_id of given dag_id and run_id as success or failure
问题描述
我可以在外部(使用 http 请求?)将与 dag_id 和 run_id 关联的特定 task_id 标记为成功/失败.
Can I externally(use a http request ?) to mark a specific task_id associated with dag_id and run_id as success/failure.
我的任务是在外部系统上长时间运行的任务,我不希望我的任务轮询系统以查找状态..因为我们可能同时运行几个 1000 个任务..
My task is a long running task on external system and I don't want my task to poll the system to find the status.. since we can probably have several 1000 task running at same time ..
理想情况下希望我的任务
Ideally want my task to
- 发出 http 请求以开始我的外部工作
- 睡觉
- 作业完成后,它(外部系统或我的作业的构建后操作)通知气流任务已完成(由 task_id、dag_id 和 run_id 标识)
谢谢
推荐答案
您可以通过将 SQL 查询直接发送到 Airflow 的元数据数据库来解决此问题:
You can solve this by sending SQL queries directly into Airflow's metadata DB:
UPDATE task_instance
SET state = 'success',
try_number = 0
WHERE
task_id = 'YOUR-TASK-ID'
AND
dag_id = 'YOUR-DAG-ID'
AND
execution_date = '2019-06-27T16:56:17.789842+00:00';
注意事项:
execution_date
过滤器至关重要,Airflow 通过execution_date
识别 DagRun,而不是通过它们的run_id
.这意味着您确实需要获取 DagRun 的执行/运行日期才能使其正常工作.- 添加
try_number = 0
部分是因为有时 Airflow 会在注意到 try_number 已达到其限制时将任务重置回failed
(max_tries
)
- The
execution_date
filter is crucial, Airflow identifies DagRuns byexecution_date
, not really by theirrun_id
. This means you really need to get your DagRun's execution/run date to make it work. - The
try_number = 0
part is added because sometimes Airflow will reset the task back tofailed
if it notices that try_number is already at its limit (max_tries
)
你可以在 Airflow 的源代码中看到它:https://github.com/apache/airflow/blob/750cb7a1a08a71b63af4ea787ae29a99cfe0a8d9/airflow/models/dagrun.py#L203
You can see it in Airflow's source code here: https://github.com/apache/airflow/blob/750cb7a1a08a71b63af4ea787ae29a99cfe0a8d9/airflow/models/dagrun.py#L203
这篇关于Airflow - 将给定 dag_id 和 run_id 的特定 task_id 标记为成功或失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!