气流:如何扩展SubDagOperator? [英] Airflow: how to extend SubDagOperator?

查看:190
本文介绍了气流:如何扩展SubDagOperator?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我尝试扩展airflow API中提供的SubDagOperator时,气流Web服务器GUI无法将其识别为SubDagOperator,从而使我无法放大到subdag。

When I try to extend the SubDagOperator provided in airflow API, airflow webserver GUI does not recognize it as SubDagOperator thereby disabling me to zoom in to the subdag.

如何我可以在扩展SubDagOperator的同时保留将其放大为subdag的功能吗?我缺少什么吗?

How can I extend the SubDagOperator while preserving the ability to zoom in to it as a subdag? Am I missing something?

推荐答案

请参见下面的示例,了解如何扩展SubDagOperator。您的案例中的关键是覆盖task_type函数

Please see the example below on how to extend the SubDagOperator. The key in your case was to override the task_type function

from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.decorators import apply_defaults


class ExampleSubdagSubclassOperator(SubDagOperator):
  template_fields = ()
  template_ext = ()

  @apply_defaults
  def __init__(self, *args, **kwargs):

    dag = kwargs.get('dag')
    task_id = kwargs.get('task_id')

    subdag = DAG(
        '{}.{}'.format(dag.dag_id, task_id),
        schedule_interval=dag.schedule_interval,
        start_date=dag.start_date
    )

    # Replace the following 3 lines with code to automatically generate the desired tasks in the subdag
    t1 = DummyOperator(dag=subdag, task_id='t1')
    t2 = DummyOperator(dag=subdag, task_id='t2')
    t3 = DummyOperator(dag=subdag, task_id='t3')

    super(ExampleSubdagSubclassOperator, self).__init__(subdag=subdag, *args, **kwargs)

  # This property needs to be overridden so that the airflow UI recognises the task as a subdag and enables
  # the "Zoom into Sub Dag" button
  @property
  def task_type(self):
      return 'SubDagOperator'

这篇关于气流:如何扩展SubDagOperator?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆