将 Python 运算符的结果作为参数传递给 BigQueryInsertJobOperator [英] Passing Result of a Python Operator as a parameter to BigQueryInsertJobOperator

查看:35
本文介绍了将 Python 运算符的结果作为参数传递给 BigQueryInsertJobOperator的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的 DAG 中有一个 python 运算符和 BigQueryInsertJobOperator.python 运算符返回的结果应传递给 params 字段中的 BigQueryInsertJobOperator.

I have a python operator and BigQueryInsertJobOperator in my DAG. The result returned by the python operator should be passed to BigQueryInsertJobOperator in the params field.

下面是我正在运行的脚本.

Below is the script I am running.

def get_columns():
    field = "name"
    return field


with models.DAG(
        "xcom_test",
        default_args=default_args,
        schedule_interval="0 0 * * *",
        tags=["xcom"],
)as dag:
    t1 = PythonOperator(task_id="get_columns", python_callable=get_columns, do_xcom_push=True)

    t2 = BigQueryInsertJobOperator(
        task_id="bigquery_insert",
        project_id=project_id,
        configuration={
            "query": {
                "query": "{% include 'xcom_query.sql' %}",
                "useLegacySql": False,
            }
        },
        force_rerun=True,
        provide_context=True,
        params={
            "columns": "{{ ti.xcom_pull(task_ids='get_columns') }}",
            "project_id": project_id
        },
    )

xcom_query.sql 如下所示

The xcom_query.sql looks below

INSERT INTO `{{ params.project_id }}.test.xcom_test`
    {{  params.columns }}
select 'Andrew' from `{{ params.project_id }}.test.xcom_test`

在运行时,列参数被转换为字符串,从而导致错误.以下是查询的转换方式.

While running this, the columns params are converted to a string and hence resulting in an error. Below is how the query was converted.

 INSERT INTO `project.test.xcom_test`
  {{ ti.xcom_pull(task_ids='get_columns') }}
 select 'Andrew' from `project.test.xcom_test`

知道我错过了什么吗?

推荐答案

我找到了我的 dag 失败的原因.

I found the reason why my dag is failing.

参数"BigQueryInsertJobOperator 的字段不是模板化字段,因此调用task_instance.xcom_pull";不会以这种方式工作.

The "params" field for BigQueryInsertJobOperator is not a templatized field and hence calling "task_instance.xcom_pull" will not work this way.

但是,您可以直接从 jinja 模板访问task_instance"变量.

But instead, you can directly access the 'task_instance' variable from jinja template.

INSERT INTO `{{ params.project_id }}.test.xcom_test`
    ({{ task_instance.xcom_pull(task_ids='get_columns') }})
select
'Andrew' from `{{ params.project_id }}.test.xcom_test`

https://marclamberti.com/blog/templates-macros-apache-气流/ - 这篇文章解释了如何识别气流中的模板参数

https://marclamberti.com/blog/templates-macros-apache-airflow/ - This article explains how to identifies template parameters in airflow

这篇关于将 Python 运算符的结果作为参数传递给 BigQueryInsertJobOperator的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆