气流 - 在函数内调用运算符 [英] Airflow - call a operator inside a function
问题描述
我正在尝试使用另一个 python 运算符调用函数内部的 python 运算符.好像我错过了什么,有人可以帮我找出我错过了什么.
I'm trying to call a python operator which is inside a function using another python operator. Seems something I missed, can someone help me to find out what I missed.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
dd = datetime(2018, 1, 1)
args = {
'owner': 'airflow',
'start_date': dd,
'retries': 0
}
def postgres_to_gcs():
t1 = BashOperator(
task_id='count_lines',
bash_command='echo "task1"',
xcom_push=True,
dag=dag)
return t1
with DAG('python_dag', description='Python DAG', schedule_interval='*/15 * * * *', start_date=dd, catchup=False) as dag:
python_task = PythonOperator(task_id='python_task', python_callable=postgres_to_gcs)
python_task
错误:
[2020-10-10 09:34:10,700] {baseoperator.py:351} WARNING - start_date for <Task(BashOperator): ttest-task> isn't datetime.datetime
[2020-10-10 09:34:10,700] {taskinstance.py:1150} ERROR - '>' not supported between instances of 'Pendulum' and 'str'
Traceback (most recent call last):
File "/root/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
result = task_copy.execute(context=context)
File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
return_value = self.execute_callable()
File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/root/airflow/dags/estdag.py", line 19, in postgres_to_gcs
dag=dag)
File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
result = func(*args, **kwargs)
File "/root/.local/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 101, in __init__
super(BashOperator, self).__init__(*args, **kwargs)
File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
result = func(*args, **kwargs)
File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 423, in __init__
self.dag = dag
File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 549, in dag
dag.add_task(self)
File "/root/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 1325, in add_task
task.start_date = max(task.start_date, self.start_date)
TypeError: '>' not supported between instances of 'Pendulum' and 'str'
[2020-10-10 09:34:10,702] {taskinstance.py:1194} INFO - Marking task as FAILED. dag_id=python_dag, task_id=python_task, execution_date=20201010T093407, start_date=20201010T093410, end_date=20201010T093410
Racooneer 建议的一种解决方法(但问题仍然存在)
One workaround suggested by Racooneer (but still the issue is there)
谢谢,浣熊!!!
删除 default_args 有助于解决它,但无法看到 bash 命令输出
Removing default_args helped to solve it, but not able to see bash command output
推荐答案
我不确定您要做什么,但您在 python 函数中发布的代码并没有真正执行运算符.
I'm not exactly sure what you are trying to do but the code you posted in the python function doesn't really execute the operator.
这应该可以正常工作:
def postgres_to_gcs():
t1 = BashOperator(
task_id='count_lines',
bash_command='echo task1',
xcom_push=True #Note: there is no dag=dag here!
)
t1.execute(dict())
with DAG(
'python_dag',
description='Python DAG',
schedule_interval='*/15 * * * *',
start_date=datetime(2018, 1, 1),
catchup=False
) as dag:
python_task = PythonOperator(
task_id='python_task',
python_callable=postgres_to_gcs
)
请注意,运算符是 Python 类.当您在 python 函数中调用运算符时,请记住您只是初始化了类构造函数.要运行该运算符,您需要调用它的 execute
方法.
Note that operators are python classes. When you call operators inside python function remember that you just initialize the class constructor. To run the operator you will need to call its execute
method.
这篇关于气流 - 在函数内调用运算符的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!