气流 - 在函数内调用运算符 [英] Airflow - call a operator inside a function

查看:39
本文介绍了气流 - 在函数内调用运算符的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用另一个 python 运算符调用函数内部的 python 运算符.好像我错过了什么,有人可以帮我找出我错过了什么.

I'm trying to call a python operator which is inside a function using another python operator. Seems something I missed, can someone help me to find out what I missed.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago


dd = datetime(2018, 1, 1)
args = {
    'owner': 'airflow',
    'start_date': dd,
    'retries': 0

}

def postgres_to_gcs():
    t1 = BashOperator(
    task_id='count_lines',
    bash_command='echo "task1"',
    xcom_push=True,
    dag=dag)
    return t1



with DAG('python_dag', description='Python DAG', schedule_interval='*/15 * * * *', start_date=dd, catchup=False) as dag:
    python_task = PythonOperator(task_id='python_task', python_callable=postgres_to_gcs)
 
    python_task

错误:

[2020-10-10 09:34:10,700] {baseoperator.py:351} WARNING - start_date for <Task(BashOperator): ttest-task> isn't datetime.datetime
[2020-10-10 09:34:10,700] {taskinstance.py:1150} ERROR - '>' not supported between instances of 'Pendulum' and 'str'
Traceback (most recent call last):
  File "/root/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/root/airflow/dags/estdag.py", line 19, in postgres_to_gcs
    dag=dag)
  File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/root/.local/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 101, in __init__
    super(BashOperator, self).__init__(*args, **kwargs)
  File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 423, in __init__
    self.dag = dag
  File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 549, in dag
    dag.add_task(self)
  File "/root/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 1325, in add_task
    task.start_date = max(task.start_date, self.start_date)
TypeError: '>' not supported between instances of 'Pendulum' and 'str'
[2020-10-10 09:34:10,702] {taskinstance.py:1194} INFO - Marking task as FAILED. dag_id=python_dag, task_id=python_task, execution_date=20201010T093407, start_date=20201010T093410, end_date=20201010T093410

Racooneer 建议的一种解决方法(但问题仍然存在)

One workaround suggested by Racooneer (but still the issue is there)

谢谢,浣熊!!!

删除 default_args 有助于解决它,但无法看到 bash 命令输出

Removing default_args helped to solve it, but not able to see bash command output

推荐答案

我不确定您要做什么,但您在 python 函数中发布的代码并没有真正执行运算符.

I'm not exactly sure what you are trying to do but the code you posted in the python function doesn't really execute the operator.

这应该可以正常工作:

def postgres_to_gcs():
    t1 = BashOperator(
        task_id='count_lines',
        bash_command='echo task1', 
        xcom_push=True            #Note: there is no dag=dag here!
    )
    t1.execute(dict())

with DAG(
        'python_dag',
        description='Python DAG',
        schedule_interval='*/15 * * * *',
        start_date=datetime(2018, 1, 1),
        catchup=False
) as dag:
    python_task = PythonOperator(
        task_id='python_task',
        python_callable=postgres_to_gcs
    )

请注意,运算符是 Python 类.当您在 python 函数中调用运算符时,请记住您只是初始化了类构造函数.要运行该运算符,您需要调用它的 execute 方法.

Note that operators are python classes. When you call operators inside python function remember that you just initialize the class constructor. To run the operator you will need to call its execute method.

这篇关于气流 - 在函数内调用运算符的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆