气流外部任务传感器卡​​住 [英] Airflow ExternalTaskSensor Stuck

查看:25
本文介绍了气流外部任务传感器卡​​住的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试让 Airflow ExternalTask​​Sensor 工作,但到目前为止还没有完成,它似乎总是卡在运行中并且永远不会完成,因此 DAG 可以继续执行下一个任务.

这是我用来测试的代码:

<预><代码>DEFAULT_ARGS = {'所有者': 'NAME','depends_on_past':错误,'start_date': 日期时间(2019, 9, 9),'电子邮件':['airflow@example.com'],'email_on_failure':错误,email_on_retry":错误}external_watch_dag = DAG('DAG-External_watcher-Test',default_args=DEFAULT_ARGS,dagrun_timeout=timedelta(小时=1),schedule_interval=无)start_op = DummyOperator(task_id='start_op',dag=external_watch_dag)trigger_external = TriggerDagRunOperator(task_id='trigger_external',trigger_dag_id='DAG-Dummy',dag=external_watch_dag)external_watch_op = ExternalTask​​Sensor(task_id='external_watch_op',external_dag_id='DAG-Dummy',external_task_id='dummy_task',check_existence=真,execution_delta=timedelta(分钟=-1),# execution_date_fn=datetime(2019, 9, 25),execution_timeout=timedelta(分钟=30),dag=external_watch_dag)end_op = DummyOperator(task_id='end_op',dag=external_watch_dag)start_op >>trigger_external >>external_watch_op >>结束操作# start_op >>[external_watch_op,trigger_external]# external_watch_op >>结束操作# 下面是上面由 Trigger 调用并由 TaskSensor 监视的虚拟 DAG 的设置dummy_dag = DAG('DAG-假人',default_args=DEFAULT_ARGS,dagrun_timeout=timedelta(小时=1),schedule_interval=无)dummy_task = BashOperator(task_id='dummy_task',bash_command='sleep 10',dag=dummy_dag)

我尝试通过多种方式调整此代码,但使用 ExternalTask​​Sensor 没有取得任何成功.

有谁知道如何解决这个问题并使 ExternalTask​​Sensor 正常工作?我还读到,在使用 ExternalTask​​Sensor 时,调度间隔可能会出现问题,部分问题是否可能是 DAG 都有 schedule_interval=None?

我已经让这两个 DAG 设置为完全相同的 schedule_interval,但这在生产中不起作用.目标是让主 DAG,external-watch-dag 有规律地进行,并在运行期间触发 DAG-Dummy,使用 DAG-Dummy 本身具有 schedule_interval=None.

非常感谢任何帮助.

解决方案

默认情况下,ExternalTask​​Sensor 将监视 external_dag_id 的执行日期与传感器 DAG 相同.使用 execution_delta,您可以设置传感器 dag 和外部 dag 之间的时间增量,以便它可以寻找正确的 execution_date 进行监控.这在两个 dag 都按计划运行时非常有效,因为您确切地知道这个时间增量.

问题:当一个 dag 被手动触发或由另一个 dag 触发时,您无法确定这两个 dag 中任何一个的确切执行日期.

解决办法:因为您使用的是TriggerDagRunOperator,您可以设置execution_date参数.这将确保您的 dag 和外部 dag 的执行日期相同.来自 docs:

<块引用>

execution_date (str or datetime.datetime) – dag 的执行日期(模板化)

所以你的代码看起来像这样:

trigger_external = TriggerDagRunOperator(task_id='trigger_external',trigger_dag_id='DAG-Dummy',dag=external_watch_dag,execution_date={{ execution_date }}", #使用模板获取当前执行日期)external_watch_op = ExternalTask​​Sensor(task_id='external_watch_op',external_dag_id='DAG-Dummy',external_task_id='dummy_task',check_existence=真,execution_timeout=timedelta(minutes=30),dag=external_watch_dag)

I am trying to get the Airflow ExternalTaskSensor to work but so far have not been able to get it to complete, it always seems to get stuck running and never finishes so the DAG can move onto the next task.

Here is the code I am using to test:


DEFAULT_ARGS = {
    'owner': 'NAME',
    'depends_on_past': False,
    'start_date': datetime(2019, 9, 9),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False
}

external_watch_dag = DAG(
    'DAG-External_watcher-Test',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=1),
    schedule_interval=None
)

start_op = DummyOperator(
    task_id='start_op',
    dag=external_watch_dag
)


trigger_external = TriggerDagRunOperator(
    task_id='trigger_external',
    trigger_dag_id='DAG-Dummy',
    dag=external_watch_dag
)

external_watch_op = ExternalTaskSensor(
    task_id='external_watch_op',
    external_dag_id='DAG-Dummy',
    external_task_id='dummy_task',
    check_existence=True,
    execution_delta=timedelta(minutes=-1),
    # execution_date_fn=datetime(2019, 9, 25),
    execution_timeout=timedelta(minutes=30),
    dag=external_watch_dag
)

end_op = DummyOperator(
    task_id='end_op',
    dag=external_watch_dag
)

start_op >> trigger_external >> external_watch_op >> end_op
# start_op >> [external_watch_op, trigger_external]
# external_watch_op >> end_op


# Below is the setup for the dummy DAG that is called above by the Trigger and watched by the TaskSensor
dummy_dag = DAG(
    'DAG-Dummy',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=1),
    schedule_interval=None
)

dummy_task = BashOperator(
    task_id='dummy_task',
    bash_command='sleep 10',
    dag=dummy_dag
)

I have tried tweaking this code a number of ways but have not gotten any success with the ExternalTaskSensor.

Does anyone know how to solve this problem and get the ExternalTaskSensor to work properly? I have also read that issues can arise through scheduling intervals when using the ExternalTaskSensor, is it possible that part of the issue is that the DAGs both have schedule_interval=None?

I had gotten this to work with both of the DAGs set to the exact same schedule_interval, but that will not work in production. The goal is to have the main DAG, external-watch-dag to be on a regular schedule and trigger that DAG-Dummy during its run, with the DAG-Dummy itself having schedule_interval=None.

Any help is greatly appreciated.

解决方案

By default the ExternalTaskSensor will monitor the external_dag_id with the same execution date that the sensor DAG. With execution_delta you can set a time delta between the sensor dag and the external dag so it can look for the correct execution_date to monitor. This works great when both dags are run in a schedule because you know exactly this timedelta.

The problem: when a dag is triggered manually or by another dag, you cannot known for sure the the exact execution date of any of these two dags.

The solution: because you are using the TriggerDagRunOperator, you can set the execution_date parameter. This will make sure that the execution date from your dag and the external dag is the same. From the docs:

execution_date (str or datetime.datetime) – Execution date for the dag (templated)

So your code will look like this:

trigger_external = TriggerDagRunOperator(
    task_id='trigger_external',
    trigger_dag_id='DAG-Dummy',
    dag=external_watch_dag,
    execution_date="{{ execution_date }}",  # Use the template to get the current execution date
)
external_watch_op = ExternalTaskSensor(
    task_id='external_watch_op',
    external_dag_id='DAG-Dummy',
    external_task_id='dummy_task',
    check_existence=True,
    execution_timeout=timedelta(minutes=30),
    dag=external_watch_dag
)

这篇关于气流外部任务传感器卡​​住的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆