动态任务生成中的意外气流行为 [英] Unexpected Airflow behaviour in dynamic task generation

查看:19
本文介绍了动态任务生成中的意外气流行为的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

出于我可以接受的原因,我尝试在每次迭代中动态生成具有不同 execution_date_fnExternalTask​​Sensor 任务.提供给 execution_date_fn 的可调用 kwarg 需要将 dt 作为输入并提供 execution_date 作为输出,我将其写为 lambda 函数,例如lambda dt: get_execution_date(i).

For reasons acceptable to me, I am trying to dynamically generate ExternalTaskSensor tasks with different execution_date_fn in each iteration. Callable provided to execution_date_fn kwarg requires to have dt as input and provide execution_date as output, which I am writing down as a lambda function, e.g. lambda dt: get_execution_date(i).

我注意到在循环中作为 lambda 函数提供的 execution_date_fn 会导致意外行为 - 所有生成的任务都具有相同的 execution_date

I noticed that execution_date_fn provided as a lambda function in a loop results in unexpected behaviour - all generated tasks have the same execution_date

我注意到这种行为不是 ExternalTask​​Sensor 固有的,而是源自其他地方.可以在此示例中看到此行为:

I noticed that this behaviour is not intrinsic to ExternalTaskSensor but originates somewhere else. This behaviour can be seen in this example:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

dag = DAG(
    'test_lambda',
    schedule_interval=None,
    start_date=datetime(2021,1,1),
    catchup=False
)

for task_id in ['task1', 'task2']:
    task = PythonOperator(
        task_id='printer_'+task_id,
        python_callable=lambda: print(task_id),
        dag=dag
    )

这会导致任务 printer_task1printer_task2 在日志中打印 'task2'.

This results in both tasks printer_task1 and printer_task2 printing 'task2' in logs.

我设法通过将传感器实例化到函数中来纠正行为:

I have managed to correct the behaviour by moving sensor instantiation into a function:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def create_task(task_id):
    task = PythonOperator(
        task_id='printer_'+task_id,
        python_callable=lambda: print(task_id),
        dag=dag
    )
    return task

dag = DAG(
    'test_lambda',
    schedule_interval=None,
    start_date=datetime(2021,1,1),
    catchup=False
)

for task_id in ['task1', 'task2']:
    task = create_task(task_id)

在这种情况下,任务 printer_task1 在日志中打印 'task1'printer_task2 打印 'task2'.

In this case task printer_task1 prints 'task1' and printer_task2 prints 'task2' in the logs.

我很想知道为什么我会观察到这种行为?

I would be interested to know why am I observing such behaviour?

免责声明:我知道向 PythonOperator 提供参数的正常方法是通过 op_args kwarg.Lambda 函数仅用于提供示例,因为在使用 execution_date_fn 时,ExternalTask​​Sensor 中没有 op_args 选项.

DISCLAIMER: I am aware that normal way to provide arguments to a PythonOperator is via op_args kwarg. Lambda functions were used solely to provide an example as op_args option is not available in ExternalTaskSensor when using execution_date_fn.

这是一个 lambda 问题,而不是特定于 Airflow 的问题.官方 Python 文档有一个关于这个问题的主题:https://docs.python.org/3/faq/programming.html#why-do-lambdas-defined-in-a-具有不同值的循环所有返回相同结果

This is a lambda issue and not Airflow-specific. Official Python documentation has a topic on the issue: https://docs.python.org/3/faq/programming.html#why-do-lambdas-defined-in-a-loop-with-different-values-all-return-the-same-result

推荐答案

这与 Airflow 关系不大,它是一个 lambda 问题:

This has little to do with Airflow, it is a lambda issue:

>>> ls = [lambda: i for i in [1,2]]
>>> ls[0]()
2
>>> ls[1]()
2

要知道为什么会这样,我建议阅读那个 Stackoverflow 帖子 可能会比我更好地解释为什么

To know why it does that, I recommend reading that Stackoverflow post that will probably explains why better than I could

这篇关于动态任务生成中的意外气流行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆