Airflow 能否持续访问短期动态生成任务的元数据? [英] Can Airflow persist access to metadata of short-lived dynamically generated tasks?

查看:32
本文介绍了Airflow 能否持续访问短期动态生成任务的元数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 DAG,每当 FileSensor 检测到文件时,它就会为每个文件生成任务,以 (1) 将文件移动到暂存区,(2) 触发单独的 DAG 进行处理文件.

I have a DAG that, whenever there are files detected by FileSensor, generates tasks for each file to (1) move the file to a staging area, (2) trigger a separate DAG to process the file.

FileSensor -> Move(File1) -> TriggerDAG(File1) -> Done
          |-> Move(File2) -> TriggerDAG(File2) -^

在 DAG 定义文件中,中间任务是通过遍历 FileSensor 正在观察的目录生成的,有点像这样:

In the DAG definition file, the middle tasks are generated by iterating over the directory that FileSensor is watching, a bit like this:

# def generate_move_task(f: Path) -> BashOperator
# def generate_dag_trigger(f: Path) -> TriggerDagRunOperator

with dag:
  for filepath in Path(WATCH_DIR).glob(*):
    sensor_task >> generate_move_task(filepath) >> generate_dag_trigger(filepath)

Move 任务移动导致任务生成的文件,因此下一次 DAG 运行不会有 FileSensor 重新触发 MoveTriggerDAG 任务.事实上,调度器根本不会为这个文件生成任务,因为在所有文件都经过Move之后,输入目录就没有内容可以迭代了..

The Move task moves the files that lead to the task generation, so the next DAG run won't have FileSensor re-trigger either Move or TriggerDAG tasks for this file. In fact, the scheduler won't generate the tasks for this file at all, since after all files go through Move, the input directory has no contents to iterate over anymore..

这会导致两个问题:

  1. 执行后,任务日志和渲染不再可用. Graph View 仅显示 DAG 现在(空)的状态,而不是运行时的状态.(树视图显示任务的运行和状态,但单击方块"并选择任何详细信息会导致 Airflow 错误.)
  2. 下游任务可能由于竞争条件而被内存占用.第一个任务是将原始文件移动到暂存区.如果这比调度程序轮询时间长,调度程序将不再收集下游 TriggerDAG(File1) 任务,这意味着即使上游任务成功运行,该任务也不会被调度执行.就好像下游任务从未存在过一样.
  1. After execution, the task logs and renderings are no longer available. The Graph View only shows the DAG as it is now (empty), not as it was at runtime. (The Tree View shows that the tasks' run and state, but clicking on the "square" and picking any details leads to an Airflow error.)
  2. The downstream tasks can be memory-holed due to a race condition. The first task is to move the originating file to a staging area. If that takes longer than the scheduler polling period, the scheduler no longer collects the downstream TriggerDAG(File1) task, which means that task is not scheduled to be executed even though the upstream task ran successfully. It's as if the downstream task never existed.

通过将任务序列更改为Copy(File1) -> 解决了竞争条件问题.TriggerDAG(File1) ->Remove(File1),但更广泛的问题仍然存在:有没有办法持久化动态生成的任务,或者至少有办法通过 Airflow 接口始终如一地访问它们?

The race condition issue is solved by changing the task sequence to Copy(File1) -> TriggerDAG(File1) -> Remove(File1), but the broader problem remains: is there a way to persist dynamically generated tasks, or at least a way to consistently access them through the Airflow interface?

推荐答案

标题问题的简短回答是,从 Airflow 1.10.11 开始,不,这似乎不可能如所述.为了呈现 DAG/任务详细信息,Airflow 网络服务器始终参考 DAG 和任务,因为它们当前定义并收集到 DagBag.如果定义发生变化或消失,那么运气不好.仪表板只显示表中的日志条目;它不会探测先前逻辑的日志(除了标题之外,它似乎也没有存储很多).

The short answer to the title question is, as of Airflow 1.10.11, no, this doesn't seem possible as stated. To render DAG/task details, the Airflow webserver always consults the DAGs and tasks as they are currently defined and collected to DagBag. If the definition changes or disappears, tough luck. The dashboard just shows the log entries in the table; it doesn't probe the logs for prior logic (nor does it seem to store much of it other than the headline).

y2k-shubham 为我如何编写 DAG/任务以便瞬态元数据是可访问".他的解决方案的潜台词是:将瞬态元数据转换为 Airflow 存储每个任务运行的内容,但保持任务本身固定.XCom 是他在这里使用的解决方案,它确实显示在任务实例详细信息/日志中.

y2k-shubham provides an excellent solution to the unspoken question of "how can I write DAGs/tasks so that the transient metadata are accessible". The subtext of his solution: convert the transient metadata into something Airflow stores per task run, but keep the tasks themselves fixed. XCom is the solution he uses here, and it does shows up in the task instance details / logs.

Airflow 是否会实现对定义从 DagBag 中消失的短暂一次性任务的持久接口访问?有可能但不太可能,原因有两个:

Will Airflow implement persistent interface access to fleeting one-time tasks whose definition disappears from the DagBag? It's possible but unlikely, for two reasons:

  1. 在呈现仪表板时,这将需要网络服务器探测历史日志,而不仅仅是当前的DagBag,这将需要额外的基础设施来保持网络界面的活泼,并且可能使显示变得非常混乱.
  2. 正如 y2k-shubham 在评论我的另一个问题短暂和不断变化的任务/DAG 是一种气流反模式.我想这会让它成为下一个功能的难卖点.
  1. It would require the webserver to probe the historical logs instead of just the current DagBag when rendering the dashboard, which would require extra infrastructure to keep the web interface snappy, and could make the display very confusing.
  2. As y2k-shubham notes in a comment to another question of mine, fleeting and changing tasks/DAGs are an Airflow anti-pattern. I'd imagine that would make this a tough sell as the next feature.

这篇关于Airflow 能否持续访问短期动态生成任务的元数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆