气流外部任务传感器卡住 [英] Airflow ExternalTaskSensor Stuck
问题描述
我正在尝试让 Airflow ExternalTaskSensor 工作,但到目前为止还没有完成,它似乎总是卡在运行中并且永远不会完成,因此 DAG 可以继续执行下一个任务.
这是我用来测试的代码:
<预><代码>DEFAULT_ARGS = {'所有者': 'NAME','depends_on_past':错误,'start_date': 日期时间(2019, 9, 9),'电子邮件':['airflow@example.com'],'email_on_failure':错误,email_on_retry":错误}external_watch_dag = DAG('DAG-External_watcher-Test',default_args=DEFAULT_ARGS,dagrun_timeout=timedelta(小时=1),schedule_interval=无)start_op = DummyOperator(task_id='start_op',dag=external_watch_dag)trigger_external = TriggerDagRunOperator(task_id='trigger_external',trigger_dag_id='DAG-Dummy',dag=external_watch_dag)external_watch_op = ExternalTaskSensor(task_id='external_watch_op',external_dag_id='DAG-Dummy',external_task_id='dummy_task',check_existence=真,execution_delta=timedelta(分钟=-1),# execution_date_fn=datetime(2019, 9, 25),execution_timeout=timedelta(分钟=30),dag=external_watch_dag)end_op = DummyOperator(task_id='end_op',dag=external_watch_dag)start_op >>trigger_external >>external_watch_op >>结束操作# start_op >>[external_watch_op,trigger_external]# external_watch_op >>结束操作# 下面是上面由 Trigger 调用并由 TaskSensor 监视的虚拟 DAG 的设置dummy_dag = DAG('DAG-假人',default_args=DEFAULT_ARGS,dagrun_timeout=timedelta(小时=1),schedule_interval=无)dummy_task = BashOperator(task_id='dummy_task',bash_command='sleep 10',dag=dummy_dag)我尝试通过多种方式调整此代码,但使用 ExternalTaskSensor 没有取得任何成功.
有谁知道如何解决这个问题并使 ExternalTaskSensor 正常工作?我还读到,在使用 ExternalTaskSensor 时,调度间隔可能会出现问题,部分问题是否可能是 DAG 都有 schedule_interval=None
?
我已经让这两个 DAG 设置为完全相同的 schedule_interval
,但这在生产中不起作用.目标是让主 DAG,external-watch-dag 有规律地进行,并在运行期间触发 DAG-Dummy,使用 DAG-Dummy 本身具有 schedule_interval=None
.
非常感谢任何帮助.
默认情况下,ExternalTaskSensor
将监视 external_dag_id
的执行日期与传感器 DAG 相同.使用 execution_delta
,您可以设置传感器 dag 和外部 dag 之间的时间增量,以便它可以寻找正确的 execution_date
进行监控.这在两个 dag 都按计划运行时非常有效,因为您确切地知道这个时间增量.
问题:当一个 dag 被手动触发或由另一个 dag 触发时,您无法确定这两个 dag 中任何一个的确切执行日期.
解决办法:因为您使用的是TriggerDagRunOperator
,您可以设置execution_date
参数.这将确保您的 dag 和外部 dag 的执行日期相同.来自 docs:
execution_date (str or datetime.datetime) – dag 的执行日期(模板化)
所以你的代码看起来像这样:
trigger_external = TriggerDagRunOperator(task_id='trigger_external',trigger_dag_id='DAG-Dummy',dag=external_watch_dag,execution_date={{ execution_date }}", #使用模板获取当前执行日期)external_watch_op = ExternalTaskSensor(task_id='external_watch_op',external_dag_id='DAG-Dummy',external_task_id='dummy_task',check_existence=真,execution_timeout=timedelta(minutes=30),dag=external_watch_dag)
I am trying to get the Airflow ExternalTaskSensor to work but so far have not been able to get it to complete, it always seems to get stuck running and never finishes so the DAG can move onto the next task.
Here is the code I am using to test:
DEFAULT_ARGS = {
'owner': 'NAME',
'depends_on_past': False,
'start_date': datetime(2019, 9, 9),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False
}
external_watch_dag = DAG(
'DAG-External_watcher-Test',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=1),
schedule_interval=None
)
start_op = DummyOperator(
task_id='start_op',
dag=external_watch_dag
)
trigger_external = TriggerDagRunOperator(
task_id='trigger_external',
trigger_dag_id='DAG-Dummy',
dag=external_watch_dag
)
external_watch_op = ExternalTaskSensor(
task_id='external_watch_op',
external_dag_id='DAG-Dummy',
external_task_id='dummy_task',
check_existence=True,
execution_delta=timedelta(minutes=-1),
# execution_date_fn=datetime(2019, 9, 25),
execution_timeout=timedelta(minutes=30),
dag=external_watch_dag
)
end_op = DummyOperator(
task_id='end_op',
dag=external_watch_dag
)
start_op >> trigger_external >> external_watch_op >> end_op
# start_op >> [external_watch_op, trigger_external]
# external_watch_op >> end_op
# Below is the setup for the dummy DAG that is called above by the Trigger and watched by the TaskSensor
dummy_dag = DAG(
'DAG-Dummy',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=1),
schedule_interval=None
)
dummy_task = BashOperator(
task_id='dummy_task',
bash_command='sleep 10',
dag=dummy_dag
)
I have tried tweaking this code a number of ways but have not gotten any success with the ExternalTaskSensor.
Does anyone know how to solve this problem and get the ExternalTaskSensor to work properly? I have also read that issues can arise through scheduling intervals when using the ExternalTaskSensor, is it possible that part of the issue is that the DAGs both have schedule_interval=None
?
I had gotten this to work with both of the DAGs set to the exact same schedule_interval
, but that will not work in production. The goal is to have the main DAG, external-watch-dag to be on a regular schedule and trigger that DAG-Dummy during its run, with the DAG-Dummy itself having schedule_interval=None
.
Any help is greatly appreciated.
By default the ExternalTaskSensor
will monitor the external_dag_id
with the same execution date that the sensor DAG. With execution_delta
you can set a time delta between the sensor dag and the external dag so it can look for the correct execution_date
to monitor. This works great when both dags are run in a schedule because you know exactly this timedelta.
The problem: when a dag is triggered manually or by another dag, you cannot known for sure the the exact execution date of any of these two dags.
The solution: because you are using the TriggerDagRunOperator
, you can set the execution_date
parameter. This will make sure that the execution date from your dag and the external dag is the same. From the docs:
execution_date (str or datetime.datetime) – Execution date for the dag (templated)
So your code will look like this:
trigger_external = TriggerDagRunOperator(
task_id='trigger_external',
trigger_dag_id='DAG-Dummy',
dag=external_watch_dag,
execution_date="{{ execution_date }}", # Use the template to get the current execution date
)
external_watch_op = ExternalTaskSensor(
task_id='external_watch_op',
external_dag_id='DAG-Dummy',
external_task_id='dummy_task',
check_existence=True,
execution_timeout=timedelta(minutes=30),
dag=external_watch_dag
)
这篇关于气流外部任务传感器卡住的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!