使用 Airflow 加载到 bigQuery 时如何忽略未知列? [英] How to ignore an unknown column when loading to bigQuery using Airflow?
问题描述
我正在使用 GoogleCloudStorageToBigQueryOperator
Json 文件的列数可能比我定义的要多.在这种情况下,我希望加载作业继续 - 只需忽略这个无法识别的列.我尝试使用 ignore_unknown_values
参数,但没有任何区别.
It may be that the Json file will have more columns than what I defined. In that case I want the load job continue - simply ignore this unrecognized column.
I tried to use the ignore_unknown_values
argument but it didn't make any difference.
我的接线员:
def dc():
return [
{
"name": "id",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "storeId",
"type": "INTEGER",
"mode": "NULLABLE"
},
...
]
gcs_to_bigquery_st = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_to_BigQuery_stage',
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template_st,
source_format='NEWLINE_DELIMITED_JSON',
source_objects=[gcs_export_uri_template],
ignore_unknown_values = True,
schema_fields=dc(),
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
skip_leading_rows = 1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
错误:
u'Error while reading data, error message: JSON parsing error in row从位置 0 开始:没有这样的字段:shippingService.',
u'Error while reading data, error message: JSON parsing error in row starting at position 0: No such field: shippingService.',
这是真的.ShippingService 不存在,它不会被添加到表中.
which is true. shippingService doesn't exist and it won't be added to the table.
我该如何解决这个问题?
How can I fix this?
从运算符中删除了 schema_fields=dc()
:
gcs_to_bigquery_st = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_to_BigQuery_stage',
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template_st,
source_format='NEWLINE_DELIMITED_JSON',
source_objects=[gcs_export_uri_template],
ignore_unknown_values = True,
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
skip_leading_rows = 1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
仍然给出相同的错误.这不会造成场景..它有命令忽略未知值:(
Still gives the same error. This doesn't make scene.. It has command to ignore unknown values :(
推荐答案
我能想到的唯一原因是您可能正在使用 Airflow 1.9.此功能是在 Airflow 1.10 中添加的.
The only reason I can think of is you are probably using Airflow 1.9. This feature was added in Airflow 1.10.
但是,您可以通过添加 src_fmt_configs={'ignoreUnknownValues': True}
:
However, you can use it as follows in Airflow 1.9 by adding src_fmt_configs={'ignoreUnknownValues': True}
:
gcs_to_bigquery_st = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_to_BigQuery_stage',
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template_st,
source_format='NEWLINE_DELIMITED_JSON',
source_objects=[gcs_export_uri_template],
src_fmt_configs={'ignoreUnknownValues': True},
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
skip_leading_rows = 1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
这篇关于使用 Airflow 加载到 bigQuery 时如何忽略未知列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!