气流Python操作员返回类型 [英] Airflow Python Operator with a. return type
问题描述
我的DAG中有一个python运算符。 python可调用函数返回布尔值。但是,当我运行DAG时,出现以下错误。
TypeError:'bool'对象不可调用
我修改了该函数以使其不返回任何内容,但随后再次出现以下错误
错误-'NoneType'对象不是可调用
下面是我的爸爸
def check_poke(阈值,sleep_interval):
flag =雪花_poke(1000,10).poke()
#print(flag)
返回标志
依赖= PythonOperator(
task_id ='poke_check',
#python_callable = check_poke(129600,600),
Provide_context = True,
python_callable = check_poke(129600,600),
dag = dag)
end = BatchEndOperator(
queue = QUEUE,
dag = dag)
start.set_downstream(dependency)
Dependency.set_downstream(end)
无法弄清楚我想念的是什么。有人可以帮我解决这个问题吗?是气流的新手。
我在dag中编辑了python运算符,如下所示
依赖关系= PythonOperator(
task_id ='poke_check',
Provide_context = True,
python_callable = check_poke(129600,600),
dag = dag)
但是现在,我得到了另一个错误。
Traceback(最近一次通话最近):
File /usr/local/lib/python2.7/dist-packages/airflow/ models.py,第1245行,正在运行
结果= task_copy.execute(context = context)
文件 /usr/local/lib/python2.7/dist-packages/airflow/operators/python_operator .py,第66行,在执行
中,return_value = self.python_callable(* self.op_args,** self.op_kwargs)
TypeError:()不接受任何参数(给定25)
[ 2019-02-15 05:30:25,375] {models.py:1298}信息-将任务标记为UP_FOR_RETRY
[2019-02-15 05:30:25,393] {models.py:1327}错误-( )不带参数(给定25个)
参数名称将其删除。您正在传递调用而不是可调用的结果。
python_callable = check_poke(129600,600)
第二个错误指出可调用对象使用25个参数进行调用。因此, lambda:
不起作用。下面的方法会起作用,但是忽略25个参数确实是有问题的。
python_callable = lambda * args,** kwargs:check_poke(129600, 600)
I have a python operator in my DAG. The python callable function is returning a bool value. But, when I run the DAG, I get the below error.
TypeError: 'bool' object is not callable
I modified the function to return nothing but then again I keep getting the below error
ERROR - 'NoneType' object is not callable
Below is my dag
def check_poke(threshold,sleep_interval):
flag=snowflake_poke(1000,10).poke()
#print(flag)
return flag
dependency = PythonOperator(
task_id='poke_check',
#python_callable=check_poke(129600,600),
provide_context=True,
python_callable=check_poke(129600,600),
dag=dag)
end = BatchEndOperator(
queue=QUEUE,
dag=dag)
start.set_downstream(dependency)
dependency.set_downstream(end)
Not able to figure out what it is that I am missing. Can someone help me out on this...Fairly new to airflow.
I edited the python operator in the dag as below
dependency = PythonOperator(
task_id='poke_check',
provide_context=True,
python_callable=check_poke(129600,600),
dag=dag)
But now, I get a different error.
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1245, in run
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/operators/python_operator.py", line 66, in execute
return_value = self.python_callable(*self.op_args, **self.op_kwargs)
TypeError: () takes no arguments (25 given)
[2019-02-15 05:30:25,375] {models.py:1298} INFO - Marking task as UP_FOR_RETRY
[2019-02-15 05:30:25,393] {models.py:1327} ERROR - () takes no arguments (25 given)
The argument name gives it away. You are passing the result of a call rather than a callable.
python_callable=check_poke(129600,600)
The second error states that the callable is called with 25 arguments. So a lambda:
won't work. The following would work but ignoring 25 arguments is really questionable.
python_callable=lambda *args, **kwargs: check_poke(129600,600)
这篇关于气流Python操作员返回类型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!