气流外部传感器在戳戳时卡住 [英] Airflow External sensor gets stuck at poking

查看:140
本文介绍了气流外部传感器在戳戳时卡住的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望一个dag在另一个dag完成之后开始。一种解决方案是使用外部传感器功能,在下面可以找到我的解决方案。我遇到的问题是依赖dag卡在戳上,我检查了



依赖dag的日志:



解决方案

首先, leader_dag 中的task_id 被命名为 print_date ,但是您设置了任务为 wait_for_task 的dependent_dag ,正在等待 leader_dag 名为<$ c的任务$ c> t1 。没有名为 t1 的任务。在 py 文件中分配给它的内容无关紧要,也没有在Airflow数据库中使用,也没有被传感器横向使用。它应该在任务名称 print_date 上等待。



第二个您的日志不与您显示的leader_dag运行所在的队列对齐



最后,我不建议您使用Airflow每分钟安排任务。当然不是两个从属任务在一起。
考虑在诸如Spark的其他系统中编写流作业,或者为此滚动自己的Celery或Dask环境。



您还可以避免使用 ExternalTask​​Sensor ,方法是在Leader_dag的末尾添加 TriggerDagRunOperator 来触发dependent_dag,然后通过设置 schedule_interval None



我在您的日志中看到的是领导者的日志来自2018-10-13T19:08:11。充其量这将是执行时间为dated_date 2018-10-13 19:07:00的dagrun,因为从19:07开始的分钟周期在19:08结束,这是可以安排的最早时间。在这种情况下,我发现在计划和执行之间会有大约11秒的延迟。但是,Airflow中可能会有数分钟的调度延迟。



我还看到了 dependent_dag 的日志从19:14:04到19:14:34,并寻找相应的19:13:00 dagrun的完成。没有迹象表明您的调度程序具有足够的时滞,可以在19:14:34之前启动 leader_dag 的19:13:00 dagrun。如果您显示它戳了5分钟左右,您最好说服我。当然,永远不会感觉到 leader_dag.t1 ,因为那不是您命名的所示任务。



因此,Airflow具有计划延迟,如果您的系统中有几千个dag,则可能会超过1分钟,因此如果 c时 catchup = False 可能会互相跟随一些运行IE 19:08、19:09和一些运行跳过一分钟(或6)的运行,例如19:10,然后是19:16,这是可能发生的,因为延迟对于dag来说是随机的-dag的基础上,即使您有正确的任务ID可以等待,传感器也可能始终处于未对准状态,甚至等待:

  wait_for_task = ExternalTask​​Sensor(
task_id ='wait_for_task',
external_dag_id ='leader_dag',
-external_task_id ='t1',
+ external_task_id ='print_date',
dag = dag)


I want one dag starts after completion of another dag. one solution is using external sensor function, below you can find my solution. the problem I encounter is that the dependent dag is stuck at poking, I checked this answer and made sure that both of the dags runs on the same schedule, my simplified code is as follows: any help would be appreciated. leader dag:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
   'owner': 'airflow',
   'depends_on_past': False,
   'start_date': datetime(2015, 6, 1),
   'retries': 1,
   'retry_delay': timedelta(minutes=5),



 }

 schedule = '* * * * *'

 dag = DAG('leader_dag', default_args=default_args,catchup=False, 
 schedule_interval=schedule)

t1 = BashOperator(
   task_id='print_date',
   bash_command='date',
   dag=dag)

the dependent dag:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.sensors import ExternalTaskSensor


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 10, 8),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),


}
schedule='* * * * *'
dag = DAG('dependent_dag', default_args=default_args, catchup=False, 
schedule_interval=schedule)

 wait_for_task = ExternalTaskSensor(task_id = 'wait_for_task', 
 external_dag_id = 'leader_dag', external_task_id='t1', dag=dag)

 t1 = BashOperator(
     task_id='print_date',
     bash_command='date',
      dag=dag)

 t1.set_upstream(wait_for_task)

the log for leader_dag:

the log for dependent dag:

解决方案

First the task_id in the leader_dag is named print_date but you setup your dependent_dag with a task wait_for_task which is waiting on leader_dag's task named t1. There is no task named t1. What you assigned it to in the py file is not relevant, nor used in the Airflow db and transversely by the sensor. It should be waiting on task name print_date.

Second your logs do not line up in which leader_dag run you show for what the dependent_dag is waiting for.

Finally, I can't recommend you use Airflow to schedule tasks every minute. Certainly not two dependent tasks together. Consider writing streaming jobs in a different system like Spark, or rolling your own Celery or Dask environment for this.

You could also avoid the the ExternalTaskSensor by adding a TriggerDagRunOperator to the end of your leader_dag to trigger the dependent_dag, and removing the schedule from that by setting the schedule_interval to None.

What I see in your logs is a log for the leader from 2018-10-13T19:08:11. This at best would be the dagrun for execution_date 2018-10-13 19:07:00 because the minute period starting 19:07 ends at 19:08 which is the earliest it can be scheduled. And I see some delay between scheduling and execution of about 11 seconds if this is the case. However there can be multiple minutes of scheduling lag in Airflow.

I also see a log from the dependent_dag which runs from 19:14:04 to 19:14:34 and is looking for the completion of the corresponding 19:13:00 dagrun. There's no indication that your scheduler is lag free enough to have started the 19:13:00 dagrun of leader_dag by 19:14:34. You could have better convinced me if you showed it poking for 5 minutes or so. Of course it's never going to sense leader_dag.t1 because that isn't what you named the tasks shown.

So, Airflow has scheduling delay, If you had a few 1000 dags in the system, it might be higher than 1 minute, such that a with catchup=False you're going to get some runs following each other IE 19:08, 19:09 and some runs that skip a minute (or 6) like 19:10 followed by 19:16 can happen, and since the delay is a bit random on a dag-by-dag basis, you might get unaligned runs with the sensor waiting for ever, EVEN if you have the correct task id to wait for:

 wait_for_task = ExternalTaskSensor(
     task_id='wait_for_task', 
     external_dag_id='leader_dag',
-    external_task_id='t1',
+    external_task_id='print_date',
     dag=dag)

这篇关于气流外部传感器在戳戳时卡住的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆