Airflow BigQueryInsertJobOperator 配置 [英] Airflow BigQueryInsertJobOperator configuration

查看:38
本文介绍了Airflow BigQueryInsertJobOperator 配置的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在从已弃用的 BigQueryOperator 转换为 BigQueryInsertJobOperator 时遇到了一些问题.我有以下任务:

I'm having some issue converting from the deprecated BigQueryOperator to BigQueryInsertJobOperator. I have the below task:

bq_extract = BigQueryInsertJobOperator(
    dag="big_query_task,
    task_id='bq_query',
    gcp_conn_id='google_cloud_default',
    params={'data': Utils().querycontext},
    configuration={
        "query": {"query": "{% include 'sql/bigquery.sql' %}", "useLegacySql": False,
                  "writeDisposition": "WRITE_TRUNCATE", "destinationTable": {"datasetId": bq_dataset}}
    })

我的 bigquery_extract.sql 查询中的这一行抛出错误:

this line in my bigquery_extract.sql query is throwing the error:

{% for field in data.bq_fields %}

我想使用 params 中的 'data',它正在调用一个方法,这个方法从一个 .json 文件中读取:

I want to use 'data' from params, which is calling a method, this method is reading from a .json file:

class Utils():
    bucket = Variable.get('s3_bucket')
    _qcontext = None

    @property
    def querycontext(self):
        if self._qcontext is None:
            self.load_querycontext()

        return self._qcontext

    def load_querycontext(self):
        with open(path.join(conf.get("core", "dags"), 'traffic/bq_query.json')) as f:
            self._qcontext = json.load(f)

bq_query.json 就是这种格式,我需要使用嵌套的 bq_fields 列表值:

the bq_query.json is this format, and I need to use the nested bq_fields list values:

{
"bq_fields": [
    { "name": "CONCAT(ID, '-', CAST(ID AS STRING), "alias": "new_id" },
    { "name": "TIMESTAMP(CAST(visitStartTime * 1000 AS INT64)", "alias": "new_timestamp" },
    { "name": "TO_JSON_STRING(hits.experiment)", "alias": "hit_experiment" }]
}

这个文件有一个我想在上面提到的查询行中使用的列表,但它抛出了这个错误:

this file has a list which I want to use in the above mentioned query line, but its throwing this error:

jinja2.exceptions.UndefinedError: 'data' 未定义

jinja2.exceptions.UndefinedError: 'data' is undefined

推荐答案

您的代码有两个问题.

第一个参数"不是 BigQueryInsertJobOperator 中支持的字段.请参阅这篇文章,其中我发布了如何在使用 BigQueryInsertJobOperator 时将参数传递给 sql 文件.如何在 Airflow 中使用 BigQueryInsertJobOperator 传递变量

First "params" is not a supported field in BigQueryInsertJobOperator. See this post where I post how to pass params to sql file when using BigQueryInsertJobOperator. How do you pass variables with BigQueryInsertJobOperator in Airflow

其次,如果您碰巧遇到找不到文件的错误,请确保设置文件的完整路径.从本地测试迁移到云时,我不得不这样做,即使文件在同一目录中.您可以使用以下示例在 dag 配置中设置路径(用您的路径替换路径):

Second, if you happen to get an error that your file cannot be found, make sure you set the full path of your file. I have had to do this when migrating from local testing to the cloud, even though file is in same directory. You can set the path in the dag config with example below(replace path with your path):

 with DAG(
    ...
    template_searchpath = '/opt/airflow/dags',
    ...
    
) as dag:

这篇关于Airflow BigQueryInsertJobOperator 配置的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆