气流:如何在函数中使用触发参数 [英] Airflow: how to use trigger parameters in functions

查看:21
本文介绍了气流:如何在函数中使用触发参数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们将 Airflow 的 KubernetesPodOperator 用于我们的数据管道.我们要添加的是通过 UI 传递参数的选项.

We are using Airflow's KubernetesPodOperator for our data pipelines. What we would like to add is the option to pass in parameters via the UI.

我们目前使用它的方式是,我们有不同的 yaml 文件来存储运算符的参数,而不是直接调用运算符,而是调用一个函数,该函数执行一些准备工作并返回运算符,如下所示:

We currently use it in a way that we have different yaml files that are storing the parameters for the operator, and instead of calling the operator directly we are calling a function that does some prep and returns the operator like this:

def prep_kubernetes_pod_operator(yaml):

    # ... read yaml and extract params

    return KubernetesPodOperator(params)

with DAG(...):
    
    task1 = prep_kubernetes_pod_operator(yaml)

对我们来说这很有效,我们可以保持我们的 dag 文件非常轻量级,但是现在我们想要添加可以通过 UI 添加一些额外参数的功能.我知道可以通过 kwargs['dag_run'].conf 访问触发器参数,但是我没有成功地将它们拉入 Python 函数.

For us this works well and we can keep our dag files pretty lightweight, however now we would like to add the functionality that we can add some extra params via the UI. I understand that the trigger params can be accessed via kwargs['dag_run'].conf, but I had no success pulling these into the Python function.

我尝试的另一件事是创建一个自定义运算符,因为它可以识别 args,但我无法在执行部分调用 KubernetesPodOperator(我猜在运算符中调用运算符无论如何都不是正确的解决方案).

Another thing I tried is to create a custom operator because that recognises the args, but I couldn't manage to call the KubernetesPodOperator in the execute part (and I guess calling an operator in an operator is not right solution anyways).

更新:

按照 NicoE 的建议,我开始扩展 KubernetesPodOperator.

Following NicoE's advice, I started to extend the KubernetesPodOperator instead.

我现在遇到的错误是,当我解析 yaml 并在之后分配参数时,父参数变成元组并引发类型错误.

The error I am having now is that when I am parsing the yaml and assign the arguments after, the parent arguments become tuples and that throws a type error.

达格:

task = NewKPO(
    task_id="task1",
    yaml_path=yaml_path)

运算符:

class NewKPO(KubernetesPodOperator):
   @apply_defaults
   def __init__(
           self,
           yaml_path: str,
           name: str = "default",
           *args,
           **kwargs) -> None:
       self.yaml_path = yaml_path
       self.name = name
       super(NewKPO, self).__init__(
           name=name, # DAG is not parsed without this line - 'key has to be string'
           *args,
           **kwargs)

   def execute(self, context):
       # parsing yaml and adding context["dag_run"].conf (...)
       self.name = yaml.name
       self.image = yaml.image
       self.secrets = yaml.secrets
       #(...) if i run a type(self.secrets) here I will get tuple
       return super(NewKPO, self).execute(context)

推荐答案

您可以使用 params,它是一个字典,可以在 DAG 级别参数定义并且在每个任务中仍然可以访问.适用于从 BaseOperator 派生的每个运算符,也可以从 UI 设置.

You could use params, which is a dictionary that can be defined at DAG level parameters and remains accesible in every task. Works for every operator derived from BaseOperator and can also be set from the UI.

以下示例展示了如何将其与不同的运算符一起使用.params 可以在 default_args dict 中定义或作为 DAG 对象的 arg.

The following example shows how to use it with different operators. params could be defined in default_args dict or as arg to the DAG object.

default_args = {
    "owner": "airflow",
    'params': {
        "param1": "first_param",
        "param2": "second_param"
    }
}

dag = DAG(
    dag_id="example_dag_params",
    default_args=default_args,
    start_date=days_ago(1),
    schedule_interval="@once",
    tags=['example_dags'],
    catchup=False
)

从 UI 触发此 DAG 时,您可以添加额外的参数:

When triggering this DAG from the UI you could add an extra param:

可以在模板化字段中访问参数,例如在 BashOperator 情况下:

Params could be accessed in templated fields, as in BashOperator case:

with dag:

    bash_task = BashOperator(
        task_id='bash_task',
        bash_command='echo bash_task: {{ params.param1 }}')

bash_task 日志输出:

{bash.py:158} INFO - Running command: echo bash_task: first_param
{bash.py:169} INFO - Output:
{bash.py:173} INFO - bash_task: first_param
{bash.py:177} INFO - Command exited with return code 0

参数可在执行上下文中访问,例如在 python_callable 中:

Params are accessible within execution context, like in python_callable:


    def _print_params(**kwargs):
        print(f"Task_id: {kwargs['ti'].task_id}")
        for k, v in kwargs['params'].items():
            print(f"{k}:{v}")

    python_task = PythonOperator(
        task_id='python_task',
        python_callable=_print_params,
    )

输出:

{logging_mixin.py:104} INFO - Task_id: python_task
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param3:param_from_the_UI

您还可以在任务级别定义中添加参数:

You could also add params at task level definition:

    python_task_2 = PythonOperator(
        task_id='python_task_2',
        python_callable=_print_params,
        params={'param4': 'param defined at task level'}
    )

输出:

{logging_mixin.py:104} INFO - Task_id: python_task_2
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param4:param defined at task level
{logging_mixin.py:104} INFO - param3:param_from_the_UI

按照示例,您可以定义一个继承自 BaseOperator 的自定义 Operator:

Following the example you could define a custom Operator that inhertis from BaseOperator:

class CustomDummyOperator(BaseOperator):

    @apply_defaults
    def __init__(self, custom_arg: str = 'default', *args, **kwargs) -> None:
        self.custom_arg = custom_arg
        super(CustomDummyOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print(f"Task_id: {self.task_id}")
        for k, v in context['params'].items():
            print(f"{k}:{v}")

任务示例是:

    custom_op_task = CustomDummyOperator(
        task_id='custom_operator_task'
    )

输出:

{logging_mixin.py:104} INFO - Task_id: custom_operator_task
{logging_mixin.py:104} INFO - custom_arg: default
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param3:param_from_the_UI

进口:

from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.models import BaseOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.utils.decorators import apply_defaults

希望对你有用!

这篇关于气流:如何在函数中使用触发参数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆