气流ExternalTaskSensor被卡住 [英] Airflow ExternalTaskSensor gets stuck
问题描述
我正在尝试使用ExternalTaskSensor,它卡住了已成功完成的另一个DAG任务。
这里是第一个DAG a完成其任务,然后应该触发通过ExternalTaskSensor的第二个DAG b。相反,它被卡在为a.first_task戳上。
第一个DAG:
导入日期时间
来自气流从airflow.operators.python_operator导入DAG
导入PythonOperator
dag = DAG(
dag_id ='a',
default_args = {'owner':'airflow' ,'start_date':datetime.datetime.now()},
schedule_interval = None
)
def do_first_task():
print('第一个任务完成')
PythonOperator(
task_id ='first_task',
python_callable = do_first_task,
dag = dag)
第二个DAG:
导入日期时间
从airflow.operators.python_operator导入DAG
从airflow.operators.sensors导入PythonOperator
从externalflowSensor
dag = DAG(
dag_id ='b',
default_args = {'owner':'airflow','start_date':datetime.datetime.now()},
schedule_interval =无
)
def do_second_task ():
打印(第二项任务已完成)
ExternalTaskSensor(
task_id ='wait_for_the_first_task_to_be_completed',
external_dag_id ='a',
external_task_id ='first_task',
dag = dag)> > \
PythonOperator(
task_id ='second_task',
python_callable = do_second_task,
dag = dag)
我在这里想念什么?
ExternalTaskSensor
假定您依赖于dag中的任务
这意味着在您的情况下, a
和 b
需要按照相同的时间表运行(例如每天的9:00 am或w / e)。
否则,您需要使用实例化 ExternalTaskSensor
时, execution_delta
或 execution_date_fn
。
以下是操作员内部的文档,以帮助进一步阐明:
:paramexecution_delta:与前一次执行到
的时间差,默认值为与当前任务相同的execution_date。
对于昨天,请使用[positive!] datetime.timedelta(days = 1)。
execution_delta或execution_date_fn都可以传递给
ExternalTaskSensor,但不能两者都传递。
:type execute_delta:datetime.timedelta
:param execute_date_fn:接收当前执行日期
并返回所需执行日期以进行查询的函数。可以将execution_delta
或execution_date_fn传递给ExternalTaskSensor,但不能两者都传递。
:键入execution_date_fn:可调用的
I'm trying to use ExternalTaskSensor and it gets stuck at poking another DAG's task, which has already been successfully completed.
Here, a first DAG "a" completes its task and after that a second DAG "b" through ExternalTaskSensor is supposed to be triggered. Instead it gets stuck at poking for a.first_task.
First DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='a',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_first_task():
print('First task is done')
PythonOperator(
task_id='first_task',
python_callable=do_first_task,
dag=dag)
Second DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor
dag = DAG(
dag_id='b',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_second_task():
print('Second task is done')
ExternalTaskSensor(
task_id='wait_for_the_first_task_to_be_completed',
external_dag_id='a',
external_task_id='first_task',
dag=dag) >> \
PythonOperator(
task_id='second_task',
python_callable=do_second_task,
dag=dag)
What am I missing here?
ExternalTaskSensor
assumes that you are dependent on a task in a dag run with the same execution date.
This means that in your case dags a
and b
need to run on the same schedule (e.g. every day at 9:00am or w/e).
Otherwise you need to use the execution_delta
or execution_date_fn
when you instantiate an ExternalTaskSensor
.
Here is the documentation inside the operator itself to help clarify further:
:param execution_delta: time difference with the previous execution to
look at, the default is the same execution_date as the current task.
For yesterday, use [positive!] datetime.timedelta(days=1). Either
execution_delta or execution_date_fn can be passed to
ExternalTaskSensor, but not both.
:type execution_delta: datetime.timedelta
:param execution_date_fn: function that receives the current execution date
and returns the desired execution date to query. Either execution_delta
or execution_date_fn can be passed to ExternalTaskSensor, but not both.
:type execution_date_fn: callable
这篇关于气流ExternalTaskSensor被卡住的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!