将 Python 运算符的结果作为参数传递给 BigQueryInsertJobOperator [英] Passing Result of a Python Operator as a parameter to BigQueryInsertJobOperator
问题描述
我的 DAG 中有一个 python 运算符和 BigQueryInsertJobOperator.python 运算符返回的结果应传递给 params 字段中的 BigQueryInsertJobOperator.
I have a python operator and BigQueryInsertJobOperator in my DAG. The result returned by the python operator should be passed to BigQueryInsertJobOperator in the params field.
下面是我正在运行的脚本.
Below is the script I am running.
def get_columns():
field = "name"
return field
with models.DAG(
"xcom_test",
default_args=default_args,
schedule_interval="0 0 * * *",
tags=["xcom"],
)as dag:
t1 = PythonOperator(task_id="get_columns", python_callable=get_columns, do_xcom_push=True)
t2 = BigQueryInsertJobOperator(
task_id="bigquery_insert",
project_id=project_id,
configuration={
"query": {
"query": "{% include 'xcom_query.sql' %}",
"useLegacySql": False,
}
},
force_rerun=True,
provide_context=True,
params={
"columns": "{{ ti.xcom_pull(task_ids='get_columns') }}",
"project_id": project_id
},
)
xcom_query.sql 如下所示
The xcom_query.sql looks below
INSERT INTO `{{ params.project_id }}.test.xcom_test`
{{ params.columns }}
select 'Andrew' from `{{ params.project_id }}.test.xcom_test`
在运行时,列参数被转换为字符串,从而导致错误.以下是查询的转换方式.
While running this, the columns params are converted to a string and hence resulting in an error. Below is how the query was converted.
INSERT INTO `project.test.xcom_test`
{{ ti.xcom_pull(task_ids='get_columns') }}
select 'Andrew' from `project.test.xcom_test`
知道我错过了什么吗?
推荐答案
我找到了我的 dag 失败的原因.
I found the reason why my dag is failing.
参数"BigQueryInsertJobOperator 的字段不是模板化字段,因此调用task_instance.xcom_pull";不会以这种方式工作.
The "params" field for BigQueryInsertJobOperator is not a templatized field and hence calling "task_instance.xcom_pull" will not work this way.
但是,您可以直接从 jinja 模板访问task_instance"变量.
But instead, you can directly access the 'task_instance' variable from jinja template.
INSERT INTO `{{ params.project_id }}.test.xcom_test`
({{ task_instance.xcom_pull(task_ids='get_columns') }})
select
'Andrew' from `{{ params.project_id }}.test.xcom_test`
https://marclamberti.com/blog/templates-macros-apache-气流/ - 这篇文章解释了如何识别气流中的模板参数
https://marclamberti.com/blog/templates-macros-apache-airflow/ - This article explains how to identifies template parameters in airflow
这篇关于将 Python 运算符的结果作为参数传递给 BigQueryInsertJobOperator的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!