气流调试:如何在 vscode 中运行 DAG 时跳过回填作业执行 [英] Airflow Debugging: How to skip backfill job execution when running DAG in vscode

查看:32
本文介绍了气流调试:如何在 vscode 中运行 DAG 时跳过回填作业执行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经设置了气流并且正在使用以下 vscode 调试配置运行 DAG:

I have setup airflow and am running a DAG using the following vscode debug configuration:

{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Current File",
            "type": "python",
            "request": "launch",
            "program": "${file}",
            "console": "integratedTerminal",
            "justMyCode": false,
            "env":{
                "AIRFLOW__CORE__EXECUTOR": "DebugExecutor",
                "AIRFLOW__DEBUG__FAIL_FAST": "True",
                "LC_ALL": "en_US.UTF-8",
                "LANG": "en_US.UTF-8"
            }
        }
    ]
}

它运行文件,我的断点 DAG defs 按预期中断,然后在文件末尾:它执行 dag.run() 然后我永远等待 dag 回填,并且我在任务的 python_callable 函数中的断点永远不会中断.

It runs the file, my breakpoints DAG defs break as expected, then at the end of the file: It executes the dag.run() and then I wait forever for the dag to backfill, and my breakpoints within python_callable functions of tasks never break.

我没有看到什么气流秘密?

What airflow secret am I not seeing?

这是我的狗:

# scheduled to run every minute, poke for a new file every ten seconds
dag = DAG(
    dag_id='download-from-s3',
    start_date=days_ago(2),
    catchup=False,
    schedule_interval='*/1 * * * *',
    is_paused_upon_creation=False
)

def new_file_detection(**context):
 print("File found...") # a breakpoint here never lands
 pprint(context)

init = BashOperator(
    task_id='init',
    bash_command='echo "My DAG initiated at $(date)"',
    dag=dag,
)
 
file_sensor = S3KeySensor(
    task_id='file_sensor',
    poke_interval=10, # every 10 seconds
    timeout=60, 
    bucket_key="s3://inbox/new/*",
    bucket_name=None,
    wildcard_match=True,
    soft_fail=True,
    dag=dag
)

file_found_message = PythonOperator(
    task_id='file_found_message',
    provide_context=True,
    python_callable=new_file_detection,
    dag=dag
 )
 
init >> file_sensor >> file_found_message

if __name__ == '__main__':
    dag.clear(reset_dag_runs=True)
    dag.run() #this triggers a backfill job

推荐答案

这对我来说按预期工作.我可以在 DAG 级别或在 python callables 定义中设置断点,然后使用 VSCode 调试器检查它们.

This is working for me as expected. I can set breakpoints at DAG level, or inside the python callables definition and go through them using VSCode debugger.

我正在使用您提供的相同调试设置,但我在 dag.clear 期间将参数 reset_dag_runs=True 更改为 dag_run_state=State.NONE() 调用,如 DebugExecutor 上指定的 文档页面.我相信这在最新版本之一上有所改变.

I'm using the same debug settings that you provided, but I changed the parameter reset_dag_runs=True to dag_run_state=State.NONE during dag.clear() call, as specified on the DebugExecutor docs page. I believe this has changed on one of the latest releases.

关于回填,我在 DAG 参数上设置了 catchup=False(它双向工作).重要提示,我正在运行 Airflow 2.0.0 版.

Regarding backfills, I'm setting catchup=False on the DAG arguments (it works both ways). Important note, I'm running version 2.0.0 of Airflow.

以下是使用与默认安装中的 example_xcomp.py 相同的代码的示例:

Here is an example using the same code from example_xcomp.py that comes with the default installation:

调试设置:

{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Current File",
            "type": "python",
            "request": "launch",
            "program": "${file}",
            "console": "internalConsole",
            "justMyCode": false,
            "env":{
                "AIRFLOW__CORE__EXECUTOR": "DebugExecutor",
                "AIRFLOW__DEBUG__FAIL_FAST": "True",
            }
        }
    ]
}

示例 DAG:

import logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

dag = DAG(
    'excom_xample',
    schedule_interval="@once",
    start_date=days_ago(2),
    default_args={'owner': 'airflow'},
    tags=['example'],
    catchup=False
)

value_1 = [1, 2, 3]
value_2 = {'a': 'b'}


def push(**kwargs):
    """Pushes an XCom without a specific target"""
    logging.info("log before PUSH")  # <<<<<<<<<<< Before landing on breakpoint
    kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)


def push_by_returning(**kwargs):
    """Pushes an XCom without a specific target, just by returning it"""
    return value_2


def puller(**kwargs):
    """Pull all previously pushed XComs and
        check if the pushed values match the pulled values."""
    ti = kwargs['ti']

    # get value_1
    pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
    print("PRINT Line after breakpoint ")  # <<<< After landing on breakpoint
    if pulled_value_1 != value_1:
        raise ValueError("The two values differ"
                         f"{pulled_value_1} and {value_1}")

    # get value_2
    pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning')
    if pulled_value_2 != value_2:
        raise ValueError(
            f'The two values differ {pulled_value_2} and {value_2}')

    # get both value_1 and value_2
    pulled_value_1, pulled_value_2 = ti.xcom_pull(
        key=None, task_ids=['push', 'push_by_returning'])
    if pulled_value_1 != value_1:
        raise ValueError(
            f'The two values differ {pulled_value_1} and {value_1}')
    if pulled_value_2 != value_2:
        raise ValueError(
            f'The two values differ {pulled_value_2} and {value_2}')


push1 = PythonOperator(
    task_id='push',
    dag=dag,
    python_callable=push,
)

push2 = PythonOperator(
    task_id='push_by_returning',
    dag=dag,
    python_callable=push_by_returning,
)

pull = PythonOperator(
    task_id='puller',
    dag=dag,
    python_callable=puller,
)

pull << [push1, push2]

if __name__ == '__main__':
    from airflow.utils.state import State
    dag.clear(dag_run_state=State.NONE)
    dag.run()

这篇关于气流调试:如何在 vscode 中运行 DAG 时跳过回填作业执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆