气流调试:如何在 vscode 中运行 DAG 时跳过回填作业执行 [英] Airflow Debugging: How to skip backfill job execution when running DAG in vscode
问题描述
我已经设置了气流并且正在使用以下 vscode 调试配置运行 DAG:
I have setup airflow and am running a DAG using the following vscode debug configuration:
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": false,
"env":{
"AIRFLOW__CORE__EXECUTOR": "DebugExecutor",
"AIRFLOW__DEBUG__FAIL_FAST": "True",
"LC_ALL": "en_US.UTF-8",
"LANG": "en_US.UTF-8"
}
}
]
}
它运行文件,我的断点 DAG defs 按预期中断,然后在文件末尾:它执行 dag.run()
然后我永远等待 dag 回填,并且我在任务的 python_callable 函数中的断点永远不会中断.
It runs the file, my breakpoints DAG defs break as expected, then at the end of the file: It executes the dag.run()
and then I wait forever for the dag to backfill, and my breakpoints within python_callable functions of tasks never break.
我没有看到什么气流秘密?
What airflow secret am I not seeing?
这是我的狗:
# scheduled to run every minute, poke for a new file every ten seconds
dag = DAG(
dag_id='download-from-s3',
start_date=days_ago(2),
catchup=False,
schedule_interval='*/1 * * * *',
is_paused_upon_creation=False
)
def new_file_detection(**context):
print("File found...") # a breakpoint here never lands
pprint(context)
init = BashOperator(
task_id='init',
bash_command='echo "My DAG initiated at $(date)"',
dag=dag,
)
file_sensor = S3KeySensor(
task_id='file_sensor',
poke_interval=10, # every 10 seconds
timeout=60,
bucket_key="s3://inbox/new/*",
bucket_name=None,
wildcard_match=True,
soft_fail=True,
dag=dag
)
file_found_message = PythonOperator(
task_id='file_found_message',
provide_context=True,
python_callable=new_file_detection,
dag=dag
)
init >> file_sensor >> file_found_message
if __name__ == '__main__':
dag.clear(reset_dag_runs=True)
dag.run() #this triggers a backfill job
推荐答案
这对我来说按预期工作.我可以在 DAG 级别或在 python callables 定义中设置断点,然后使用 VSCode 调试器检查它们.
This is working for me as expected. I can set breakpoints at DAG level, or inside the python callables definition and go through them using VSCode debugger.
我正在使用您提供的相同调试设置,但我在 dag.clear 期间将参数
调用,如 reset_dag_runs=True
更改为 dag_run_state=State.NONE
()DebugExecutor
上指定的 文档页面.我相信这在最新版本之一上有所改变.
I'm using the same debug settings that you provided, but I changed the parameter reset_dag_runs=True
to dag_run_state=State.NONE
during dag.clear()
call, as specified on the DebugExecutor
docs page. I believe this has changed on one of the latest releases.
关于回填,我在 DAG 参数上设置了 catchup=False
(它双向工作).重要提示,我正在运行 Airflow 2.0.0 版.
Regarding backfills, I'm setting catchup=False
on the DAG arguments (it works both ways). Important note, I'm running version 2.0.0 of Airflow.
以下是使用与默认安装中的 example_xcomp.py
相同的代码的示例:
Here is an example using the same code from example_xcomp.py
that comes with the default installation:
调试设置:
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "internalConsole",
"justMyCode": false,
"env":{
"AIRFLOW__CORE__EXECUTOR": "DebugExecutor",
"AIRFLOW__DEBUG__FAIL_FAST": "True",
}
}
]
}
示例 DAG:
import logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
dag = DAG(
'excom_xample',
schedule_interval="@once",
start_date=days_ago(2),
default_args={'owner': 'airflow'},
tags=['example'],
catchup=False
)
value_1 = [1, 2, 3]
value_2 = {'a': 'b'}
def push(**kwargs):
"""Pushes an XCom without a specific target"""
logging.info("log before PUSH") # <<<<<<<<<<< Before landing on breakpoint
kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)
def push_by_returning(**kwargs):
"""Pushes an XCom without a specific target, just by returning it"""
return value_2
def puller(**kwargs):
"""Pull all previously pushed XComs and
check if the pushed values match the pulled values."""
ti = kwargs['ti']
# get value_1
pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
print("PRINT Line after breakpoint ") # <<<< After landing on breakpoint
if pulled_value_1 != value_1:
raise ValueError("The two values differ"
f"{pulled_value_1} and {value_1}")
# get value_2
pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning')
if pulled_value_2 != value_2:
raise ValueError(
f'The two values differ {pulled_value_2} and {value_2}')
# get both value_1 and value_2
pulled_value_1, pulled_value_2 = ti.xcom_pull(
key=None, task_ids=['push', 'push_by_returning'])
if pulled_value_1 != value_1:
raise ValueError(
f'The two values differ {pulled_value_1} and {value_1}')
if pulled_value_2 != value_2:
raise ValueError(
f'The two values differ {pulled_value_2} and {value_2}')
push1 = PythonOperator(
task_id='push',
dag=dag,
python_callable=push,
)
push2 = PythonOperator(
task_id='push_by_returning',
dag=dag,
python_callable=push_by_returning,
)
pull = PythonOperator(
task_id='puller',
dag=dag,
python_callable=puller,
)
pull << [push1, push2]
if __name__ == '__main__':
from airflow.utils.state import State
dag.clear(dag_run_state=State.NONE)
dag.run()
这篇关于气流调试:如何在 vscode 中运行 DAG 时跳过回填作业执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!