在一次dag运行期间多久一次读取一次dag定义文件(每次任务运行/触发时都会重新评估/重新计算dag)? [英] How often is dag definition file read during a single dag run (is dag reevaluated / recalculated every time a task runs / fires)?

查看:359
本文介绍了在一次dag运行期间多久一次读取一次dag定义文件(每次任务运行/触发时都会重新评估/重新计算dag)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在一次dag运行中多久一次读取一次dag定义文件?

How often is a dag definition file read during a single dag run?

有一个大的耗时,需要很长的时间才能完成(〜1-3分钟).在dag运行时查看每个任务的日志,似乎在运行每个任务之前都在执行dag定义文件...

Have a large dag that takes long amount of time to build (~1-3min). Looking at the logs of each task as the dag is running it appears that the dag definition file is being executed for every task before it runs...

*** Reading local file: /home/airflow/airflow/logs/mydag/mytask/2020-01-30T04:51:34.621883+00:00/1.log
[2020-01-29 19:02:10,844] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: mydag.mytask2020-01-30T04:51:34.621883+00:00 [queued]>
[2020-01-29 19:02:10,866] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: mydag.mytask2020-01-30T04:51:34.621883+00:00 [queued]>
[2020-01-29 19:02:10,866] {taskinstance.py:866} INFO - 
--------------------------------------------------------------------------------
[2020-01-29 19:02:10,866] {taskinstance.py:867} INFO - Starting attempt 1 of 1
[2020-01-29 19:02:10,866] {taskinstance.py:868} INFO - 
--------------------------------------------------------------------------------
[2020-01-29 19:02:10,883] {taskinstance.py:887} INFO - Executing <Task(BashOperator): precheck_db_perms> on 2020-01-30T04:51:34.621883+00:00
[2020-01-29 19:02:10,887] {standard_task_runner.py:52} INFO - Started process 140570 to run task
[2020-01-29 19:02:11,048] {logging_mixin.py:112} INFO - [2020-01-29 19:02:11,047] {dagbag.py:403} INFO - Filling up the DagBag from /home/airflow/airflow/dags/mydag.py
[2020-01-29 19:02:11,052] {logging_mixin.py:112} INFO - <output from my dag definition file>
[2020-01-29 19:02:11,101] {logging_mixin.py:112} INFO - <more output from my dag definition file>
....
....
....
[2020-01-29 19:02:58,651] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: mydag.mytask 2020-01-30T04:51:34.621883+00:00 [running]> airflowetl.co.local
[2020-01-29 19:02:58,674] {bash_operator.py:81} INFO - Tmp dir root location: 
 /tmp
[2020-01-29 19:02:58,674] {bash_operator.py:91} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=me@co.org
AIRFLOW_CTX_DAG_OWNER=me
AIRFLOW_CTX_DAG_ID=mydag
AIRFLOW_CTX_TASK_ID=mytask
AIRFLOW_CTX_EXECUTION_DATE=2020-01-30T04:51:34.621883+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2020-01-30T04:51:34.621883+00:00
[2020-01-29 19:02:58,675] {bash_operator.py:105} INFO - Temporary script location: /tmp/airflowtmphwu1ckty/mytaskbmnsizw5
<only now does the actual task logic output seem to start>

日志的第一部分似乎暗示每次运行新任务时都会运行dag文件(我在每个任务中都看到了此信息).

where the first whole part of the log seems to imply that the dag file is being run each time a new task is run (I see this for every task).

这确实是这里发生的事情吗?这是正常/预期的行为吗?请注意,由于我的dag需要花费一些时间来构建,因此这意味着该时间在dag中的每个任务(在这种情况下有很多)上成倍增加,这使我认为这要么不正常,要么有一些我不在这里使用的最佳实践.有更多气流经验的人可以帮助解释我在这里看到的内容吗?

Is this indeed what is happening here? Is this normal / expected behavior? Note that since my dag takes some time to build, this would mean that that time is being multiplied across every task in the dag (of which there are many in this case), which makes me think this is either not normal or there is some best practice I am not using here. Could anyone with more airflow experience help explain what I'm seeing here?

推荐答案

After some discussion on the airflow email list, it turns out that airflow builds the dag for each task when it is run (so each tasks includes the overhead of building the dag again (which in my case was very significant)). From the conversation

是的,每个任务都在进程隔离中运行(并且可以在单独的计算机上运行),因此每个任务都是从头开始构建DAG的.

Yes, every task is run in process isolation (and could be running across separate machines) so every tasks builds the DAG from scratch.

基本任务是由代码本身定义的,因此工作进程只能通过解析定义该任务的python代码来确定运行时要执行的代码.也许在某些情况下,可以在包含任务的dag的整个上下文之外很好地定义任务,但通常不适用.

Fundamentally tasks are defined by the code itself, and so a worker process can only determine what code to execute when running by parsing the python code that defines it. Maybe there are some cases where a task can be well defined outside of the full context of the dag that contains it but that doesn't apply in general.

任务不是在乎DAB结构,而是任务仅作为DAG的一部分存在,在Airflow中获得任务的唯一方法是首先获得DAG.至少据我所知.

It’s not that the task cares about the DAB structure it’s that the tasks only exist as part of the DAG, the only way to get a task in Airflow is to get the DAG first. At least as far I know.

最终腌制我用来构建图形的配置,并让dag-config构建计划的dag,然后触发我的实际操作(将日程表设置为无")是我所做的.我的dag是在一个循环中创建的,该循环执行DB查询以为一组表创建DAG分支,因此涉及查询,并导致开销b/c,而实际上只需要为每个任务执行一次查询即可.为DAG进行配置.)

Ultimately pickling the configs I use to build the graph and making that dag-config building a scheduled dag that then triggers my actual dag (with that schedule set to None) command was what I did. My dag is created in a loop that does DB queries to make DAG branches for a set of tables, so queries are involved and was causing overhead b/c the queries were being done for every single task when only really needed to do it once to make the config dict for the DAG).

这篇关于在一次dag运行期间多久一次读取一次dag定义文件(每次任务运行/触发时都会重新评估/重新计算dag)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆