使用插件导入DAG时出现气流错误-只能在操作员之间设置关系 [英] Airflow error importing DAG using plugin - Relationships can only be set between Operators
问题描述
我编写了一个气流插件,其中仅包含一个自定义运算符(以支持BigQuery中的CMEK)。我可以创建一个简单的DAG,其中包含使用此运算符并且可以正常执行的单个任务。
I have written an airflow plugin that simply contains one custom operator (to support CMEK in BigQuery). I can create a simple DAG with a single task that uses this operator and that executes fine.
但是,如果我尝试在DAG中创建一个从DummyOperator任务到我的自定义运算符任务的依赖项,DAG将无法在UI中加载并抛出以下错误,而我无法理解为什么会引发此错误?
However if I try and create a dependency in the DAG from a DummyOperator task to my custom operator task the DAG fails to load in the UI and throws the following error and I can't understand why this error is being thrown?
损坏的DAG:[/ home / airflow / gcs / dags / js_bq_custom_plugin_v2.py]只能在运营商之间设置关系;收到BQCMEKOperator
Broken DAG: [/home/airflow/gcs/dags/js_bq_custom_plugin_v2.py] Relationships can only be set between Operators; received BQCMEKOperator
到目前为止,我已经在composer-1.4.2-airflow-1.9.0,composer-1.4.2-上对此进行了测试airflow-1.10.0和composer-1.4.1-airflow-1.10.0。
I have tested this so far on composer-1.4.2-airflow-1.9.0, composer-1.4.2-airflow-1.10.0 and composer-1.4.1-airflow-1.10.0.
每个任务的运行气流测试均已成功完成。
Running airflow test for each of the tasks completes without error.
在DAG中单独使用它很好(如下所示),所以我不认为插件有天生的错误
Using it in isolation in a DAG works fine (as below) so I don't believe there is anything inherently wrong with the plugin
import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator
default_dag_args = {
'start_date': datetime.datetime(2019,1,1),
'retries': 0
}
dag = DAG(
'js_bq_custom_plugin',
schedule_interval=None,
catchup=False,
concurrency=1,
max_active_runs=1,
default_args=default_dag_args)
run_this = BQCMEKOperator(
task_id = 'cmek_plugin_test',
sql = 'select * from ds.foo LIMIT 15',
project = 'xxx',
dataset = 'js_dev',
table = 'cmek_test10',
key = 'xxx',
dag = dag
)
如果我引入了DummyOperator和依赖项,则会发生错误
Whereas if I introduce a DummyOperator and dependency then the error occurs
import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator
from airflow.operators.dummy_operator import DummyOperator
default_dag_args = {
'start_date': datetime.datetime(2019,1,1),
'retries': 0
}
dag = DAG(
'js_bq_custom_plugin_v2',
schedule_interval=None,
catchup=False,
concurrency=1,
max_active_runs=1,
default_args=default_dag_args)
etl_start = DummyOperator(task_id='etl_start', dag=dag)
extract = BQCMEKOperator(
task_id = 'extract',
sql = 'select * from foo.bar LIMIT 15',
project = 'xxx',
dataset = 'js_dev',
table = 'cmek_test5',
key = 'xxx',
dag = dag
)
etl_start.set_downstream(extract)
运算符本身很简单,我可以使用以下最简单的自定义运算符重现该问题
The operator itself is straightforward and I can reproduce the issue with the simplest custom operator such as the one below
import logging
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class TestOperator(BaseOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(TestOperator, self).__init__(*args, **kwargs)
def execute(self, context):
logging.info("Executed by TestOperator")
在 init .py
from airflow.plugins_manager import AirflowPlugin
from test_plugin.operators.test_operator import TestOperator
class TestPlugin(AirflowPlugin):
name = "test_plugin"
operators = [TestOperator]
hooks = []
executors = []
macros = []
admin_views = []
flask_blueprints = []
menu_links = []
在models.py中查看了产生此错误的气流代码,它使用isinstance(t,BaseOperator),当我仅在python中运行它时,使用我的自定义运算符为我的任务返回了true,所以我不知道发生了什么? / p>
Also having looked at the airflow code in models.py that generates this error it uses isinstance(t, BaseOperator) and this returns true for my task using my custom operator when I just run it in python so I have no idea what is going on?
for t in task_list:
if not isinstance(t, BaseOperator):
raise AirflowException(
"Relationships can only be set between "
"Operators; received {}".format(t.__class__.__name__))
推荐答案
在composer-1.4.2版本中引入了一个错误,该错误现已修复,请尝试创建一个新的Composer环境,DAG错误应消失,同时,我们还将在现有的1.4上自动应用该修复程序。接下来几天将有2种环境。
There was a bug introduced in composer-1.4.2 release which we have fixed by now, try create a new Composer environment and that DAG error should go away. Meanwhile, we'll also apply that fix automatically on existing 1.4.2 environments over the next few days.
这篇关于使用插件导入DAG时出现气流错误-只能在操作员之间设置关系的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!