使用 Airflow 加载到 bigQuery 时如何忽略未知列? [英] How to ignore an unknown column when loading to bigQuery using Airflow?

查看:34
本文介绍了使用 Airflow 加载到 bigQuery 时如何忽略未知列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 GoogleCloudStorageToBigQueryOperator

Json 文件的列数可能比我定义的要多.在这种情况下,我希望加载作业继续 - 只需忽略这个无法识别的列.我尝试使用 ignore_unknown_values 参数,但没有任何区别.

It may be that the Json file will have more columns than what I defined. In that case I want the load job continue - simply ignore this unrecognized column. I tried to use the ignore_unknown_values argument but it didn't make any difference.

我的接线员:

def dc():
    return [
    {
        "name": "id",
        "type": "INTEGER",
        "mode": "NULLABLE"
    },
    {
        "name": "storeId",
        "type": "INTEGER",
        "mode": "NULLABLE"
    },
 ...
]
gcs_to_bigquery_st = GoogleCloudStorageToBigQueryOperator(
    dag=dag,
    task_id='load_to_BigQuery_stage',
    bucket=GCS_BUCKET_ID,
    destination_project_dataset_table=table_name_template_st,
    source_format='NEWLINE_DELIMITED_JSON',
    source_objects=[gcs_export_uri_template],
    ignore_unknown_values = True,
    schema_fields=dc(),
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    skip_leading_rows = 1,
    google_cloud_storage_conn_id=CONNECTION_ID,
    bigquery_conn_id=CONNECTION_ID
)

错误:

u'Error while reading data, error message: JSON parsing error in row从位置 0 开始:没有这样的字段:shippingService.',

u'Error while reading data, error message: JSON parsing error in row starting at position 0: No such field: shippingService.',

这是真的.ShippingService 不存在,它不会被添加到表中.

which is true. shippingService doesn't exist and it won't be added to the table.

我该如何解决这个问题?

How can I fix this?

从运算符中删除了 schema_fields=dc():

gcs_to_bigquery_st = GoogleCloudStorageToBigQueryOperator(
    dag=dag,
    task_id='load_to_BigQuery_stage',
    bucket=GCS_BUCKET_ID,
    destination_project_dataset_table=table_name_template_st,
    source_format='NEWLINE_DELIMITED_JSON',
    source_objects=[gcs_export_uri_template],
    ignore_unknown_values = True,
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    skip_leading_rows = 1,
    google_cloud_storage_conn_id=CONNECTION_ID,
    bigquery_conn_id=CONNECTION_ID
)

仍然给出相同的错误.这不会造成场景..它有命令忽略未知值:(

Still gives the same error. This doesn't make scene.. It has command to ignore unknown values :(

推荐答案

我能想到的唯一原因是您可能正在使用 Airflow 1.9.此功能是在 Airflow 1.10 中添加的.

The only reason I can think of is you are probably using Airflow 1.9. This feature was added in Airflow 1.10.

但是,您可以通过添加 src_fmt_configs={'ignoreUnknownValues': True}:

However, you can use it as follows in Airflow 1.9 by adding src_fmt_configs={'ignoreUnknownValues': True}:

gcs_to_bigquery_st = GoogleCloudStorageToBigQueryOperator(
    dag=dag,
    task_id='load_to_BigQuery_stage',
    bucket=GCS_BUCKET_ID,
    destination_project_dataset_table=table_name_template_st,
    source_format='NEWLINE_DELIMITED_JSON',
    source_objects=[gcs_export_uri_template],
    src_fmt_configs={'ignoreUnknownValues': True},
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    skip_leading_rows = 1,
    google_cloud_storage_conn_id=CONNECTION_ID,
    bigquery_conn_id=CONNECTION_ID
)

这篇关于使用 Airflow 加载到 bigQuery 时如何忽略未知列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆