使用 PythonOperator 和 BranchPythonOperator 的条件任务 [英] Conditional Tasks using PythonOperator and BranchPythonOperator

查看:34
本文介绍了使用 PythonOperator 和 BranchPythonOperator 的条件任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

大家好,我是气流和蟒蛇的新手.我需要根据输入 json 中变量的值运行任务.如果变量insurance"的值为true"然后task1,task2,task3需要运行,否则task4,task5,task6需要运行.由于我是新手,我对 PythonOperator & 的用法不太了解.BranchPythonOperator.

这是我的输入json:

<代码>{汽车":{engine_no":123_st_456","json": "{\"make\":\"Honda\",\"model\": Jazz,\"insurance\":\"true\",\污染":\真实"}"}}

代码如下:

from 气流导入 DAG从日期时间导入日期时间从airflow.operators.bash_operator 导入BashOperator从airflow.operators 导入PythonOperator导入日志导入jsondefault_args = {'所有者':'气流','depends_on_past': 错误}dag = DAG('DAG_NAME',default_args=default_args,schedule_interval=None,max_active_runs=5, start_date=datetime(2020, 8, 4))Python运算符(task_id = 'sample_task',python_callable = 'sample_fun',op_kwargs = {json : '{{ dag_run.car.json}}'},提供上下文=真,达格 = 达格)def sample_fun( json,**kwargs):保险标志 = json.dumps(json)['保险']任务 1 = BashOperator(task_id='task1',bash_command='echo 1')任务 2 = BashOperator(task_id='task2',bash_command='echo 2')任务 3 = BashOperator(task_id='task3',bash_command='echo 3')task4 = BashOperator(task_id='task4',bash_command='echo 4')task5 = BashOperator(task_id='task5',bash_command='echo 5')task6 = BashOperator(task_id='task6',bash_command='echo 6')如果 insurance_flag == true":task1.dag = dagtask2.dag = dagtask3.dag = dag任务 1 >任务 2 >>任务 3别的:task4.dag = dagtask5.dag = dagtask6.dag = dag任务 4 >任务 5 >>任务 6

解决方案

代码中的主要问题

  1. dag-definition-file


    <块引用>

    如果条件满足返回'task1', 'task2', 'task3' else 'task4', 'task5', 'task6'.我们可以添加 1 个以上的任务作为回报吗

    不,你不能.(你不必)

    BranchPythonOperator 要求它的 python_callable 应该只返回分支的第一个任务的 task_id

    • 第一个分支:task1task2task3,第一个任务的task_id = task1
    • 第二个分支:task4task5task6,第一个任务的task_id = task4

    另外要明白,由于上述两组任务已经连接在一起,所以它们自然会按照这个顺序一个接一个地执行(否则连接它们有什么意义?无论如何?)

    task1 >>任务 2 >>任务 3

    查看这些链接(除了上面答案中已经内嵌的链接)

    Hi Guys am new to airflow and python. I need to run the tasks based on the value of a variable in the input json. If the value of the variable 'insurance' is "true" then task1, task2, task3 need to run else task4, task5, task6 need to run. Since am a newbie to this i dont have much idea about the usage of PythonOperator & BranchPythonOperator.

    This is my input json:

    {
      "car": {
        "engine_no": "123_st_456",
        "json": "{\"make\":\"Honda\",\"model\": Jazz, \"insurance\":\"true\",\"pollution\":\"true\" }"
      }
    }
    

    The code is given below:

    from airflow import DAG
    from datetime import datetime
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators import PythonOperator
    import logging
    import json
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False
    }
    
    dag = DAG('DAG_NAME',default_args=default_args,schedule_interval=None,max_active_runs=5, start_date=datetime(2020, 8, 4))   
    
    PythonOperator(
       task_id = 'sample_task',
       python_callable = 'sample_fun',
       op_kwargs = {
           json  : '{{ dag_run.car.json}}'
       },
       provide_context=True,
       dag = dag
    )
    
    def sample_fun( json,**kwargs):
      insurance_flag = json.dumps(json)['insurance']
    
    task1 = BashOperator(
        task_id='task1',
        bash_command='echo 1'
    )
    
    task2 = BashOperator(
        task_id='task2',
        bash_command='echo 2'
    )
    
    task3 = BashOperator(
        task_id='task3',
        bash_command='echo 3'
    ) 
    
    task4 = BashOperator(
        task_id='task4',
        bash_command='echo 4'
    )  
    
    task5 = BashOperator(
        task_id='task5',
        bash_command='echo 5'
    )
    
    task6 = BashOperator(
        task_id='task6',
        bash_command='echo 6'
    ) 
    
    
    if insurance_flag == "true":
        task1.dag = dag
        task2.dag = dag
        task3.dag = dag
        task1 >> task2 >> task3
        
    else:
        task4.dag = dag
        task5.dag = dag
        task6.dag = dag
        task4 >> task5 >> task6
    

    解决方案

    Primary problem in your code

    1. The dag-definition-file is continuously parsed by Airflow in background and the generated DAGs & tasks are picked by scheduler. The way your file wires tasks together creates several problems

      • all 6 tasks (task1 .. task6) are ALWAYS created (and hence they will always run, irrespective of insurance_flag); just their inter-task dependency is set in accordance with insurance_flag

      • the correct way instead is to put both task instantiation (creation of PythonOperator taskn object) as well as task wiring within that if .. else block. That ways, the unnecessary tasks won't be created (and hence won't run)

    2. While the point 1. above alone should be enough to fix your code, may i offer you a suggestion for improvement: having a Variable being read in dag definition file means a SQL query being fired by Airflow's SQLAlchemy ORM very frequently in background (every cycle of continuously parsing dag-definition file)

      • this not just unnecessarily overloads your SQLAlchemy backend meta-db, but also slows down parser (in extreme case can lead of DagBag timeout if parsing starts taking too long)
      • instead you can leverage that BranchPythonOperator in right way to move that Variable reading on runtime (when DAG / tasks will be actually run) rather than Dag generation time (when dag-file is parsed by Airflow and DAG is generated on webserver); here is the code for that (and you should do away with that if-else block completely)

      """ branch 1 """
      task1 >> task2 >> task3
      """ branch 2 """
      task4 >> task5 >> task6
      
      def branch_decider(**kwargs):
          my_var_dict = Variable.get('my_var_name', deserialize_json=True)
          # decide which branch to take based on insurance flag
          if my_var_dict['car']['json']['insurance']:
              return 'task1'
          else:
              return 'task4'
      
      branch_task = BranchPythonOperator(task_id='branch_task',
                                         dag=dag,
                                         python_callable=branch_decider)
      


    Other (minor) problems in your code

    • Missing mandatory dag argument from task instantiations

         task1 = BashOperator(
           task_id='task1',
           bash_command='echo 1',
           dag=dag
         )
      

    • a dagling PythonOperator with a callable which json.dumps Variable that is solving no purpose (unless i misunderstood you code / intent here, remove it completely)

      PythonOperator(
          task_id='sample_task',
          python_callable=sample_fun,
          op_kwargs={
              json: '{{ dag_run.car.json}}'
          },
          provide_context=True,
          dag=dag
      )
      
      
      def sample_fun(json, **kwargs):
          insurance_flag = json.dumps(json)['insurance']
      


    UPDATE-1

    Responding to queries raised over comments

    We have used Variable.get( my_ var_ name). What is this my_ var_ name

    Variables have a key & value, my_var_name is the key of variable (see the Key column in following screenshot from Airflow UI)


    If condition satisfies return 'task1', 'task2', 'task3' else 'task4', 'task5', 'task6'. Can we add more than 1 tasks in return

    No you can't. (you don't have to)

    BranchPythonOperator requires that it's python_callable should return the task_id of first task of the branch only

    • 1st branch: task1, task2, task3, first task's task_id = task1
    • 2nd branch: task4, task5, task6, first task's task_id = task4

    Furthermore do understand that since the above two sets of tasks have already been wired together, so they will be naturally executed after one-another in that sequence (otherwise what would be the point of wiring them anyways?)

    task1 >> task2 >> task3
    

    Check out these links (in addition to links already inlined in answer above)

    这篇关于使用 PythonOperator 和 BranchPythonOperator 的条件任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆