从数据流(Python)将嵌套模式写入BigQuery [英] Writing nested schema to BigQuery from Dataflow (Python)
问题描述
我有一个Dataflow作业要写入BigQuery.它适用于非嵌套模式,但是不适用于嵌套模式.
I have a Dataflow job to write to BigQuery. It works well for non-nested schema, however fails for the nested schema.
这是我的数据流管道:
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions)
schema = 'url: STRING,' \
'ua: STRING,' \
'method: STRING,' \
'man: RECORD,' \
'man.ip: RECORD,' \
'man.ip.cc: STRING,' \
'man.ip.city: STRING,' \
'man.ip.as: INTEGER,' \
'man.ip.country: STRING,' \
'man.res: RECORD,' \
'man.res.ip_dom: STRING'
first = p | 'read' >> ReadFromText(wordcount_options.input)
second = (first
| 'process' >> (beam.ParDo(processFunction()))
| 'write' >> beam.io.WriteToBigQuery(
'myBucket:tableFolder.test_table',
schema=schema)
)
我使用以下架构创建了BigQuery表:
I created BigQuery Table using the following Schema is:
[
{
"mode": "NULLABLE",
"name": "url",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "ua",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "method",
"type": "STRING"
},
{
"mode": "REPEATED",
"name": "man",
"type": "RECORD",
"fields":
[
{
"mode": "REPEATED",
"name": "ip",
"type": "RECORD",
"fields":
[
{
"mode": "NULLABLE",
"name": "cc",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "city",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "as",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "country",
"type": "STRING"
}
]
},
{
"mode": "REPEATED",
"name": "res",
"type": "RECORD",
"fields":
[
{
"mode": "NULLABLE",
"name": "ip_dom",
"type": "STRING"
}
]
}
]
}
]
我遇到以下错误:
BigQuery creation of import job for table "test_table" in dataset "tableFolder" in project "myBucket" failed., BigQuery execution failed., HTTP transport error:
Message: Invalid value for: url is not a valid value
HTTP Code: 400
问题有人可以指导我吗?我究竟做错了什么?另外,如果有更好的方法遍历所有嵌套模式并写入BigQuery,请提出建议?
Question Can someone please guide me? What am I doing wrong? Also, If there is a better way to iterate through all the nested schema and write to BigQuery please suggest?
其他信息我的数据文件:
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"IN","city":"delhi","as":274,"country":"States"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"DK","city":"munlan","as":4865,"country":"United"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"GET","man":{"ip":{"cc":"BS","city":"sind","as":7655,"country":"India"},"res":{"ip_dom":"v1"}}}
推荐答案
代码的问题是,您在指定 BigQuery表架构 时尝试使用嵌套字段 为字符串,不受支持.为了将嵌套记录从Apache Beam推送到BigQuery中,您需要创建 TableSchema
对象,即使用内置解析器:
The problem with your code is that you try to use nested fields while specifying BigQuery Table Schema as string, which is not supported.
In order to push nested records into BigQuery from Apache Beam you need to create TableSchema
object, i.e using built-in parser:
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
table_schema = parse_table_schema_from_json(your_bigquery_json_schema)
您需要在此处将模式作为JSON字符串传递,您可以在终端中使用以下命令来获取它(我假设您已安装 gcloud工具
):
You need to pass schema as JSON string there, you can obtain it using the following command in your terminal (I assume that you have gcloud tools
installed):
bq --project=your-gcp-project-name --format=json show your.table.name > schema.json
,在Python中按如下方式使用它:
and in Python use it as follows:
table_schema = parse_table_schema_from_json(json.dumps(json.load(open("schema.json"))["schema"]))
然后在您的管道中:
beam.io.WriteToBigQuery(
'myBucket:tableFolder.test_table',
schema=table_schema)
您还可以看一下显示手动创建 TableSchema
对象的示例: https://github.com/apache/beam/blob/474345f5987e47a22d063c7bfcb3638c85a57e64/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
You can also take a look at the example showing manual creation of TableSchema
object:
https://github.com/apache/beam/blob/474345f5987e47a22d063c7bfcb3638c85a57e64/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
(来自链接的示例):
from apache_beam.io.gcp.internal.clients import bigquery
table_schema = bigquery.TableSchema()
full_name_schema = bigquery.TableFieldSchema()
full_name_schema.name = 'fullName'
full_name_schema.type = 'string'
full_name_schema.mode = 'required'
table_schema.fields.append(full_name_schema)
# A nested field
phone_number_schema = bigquery.TableFieldSchema()
phone_number_schema.name = 'phoneNumber'
phone_number_schema.type = 'record'
phone_number_schema.mode = 'nullable'
number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
phone_number_schema.fields.append(number)
table_schema.fields.append(phone_number_schema)
area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
phone_number_schema.fields.append(area_code)
table_schema.fields.append(phone_number_schema)
然后只需在 beam.io.WriteToBigQuery
中使用 table_schema
变量.
then just use table_schema
variable in beam.io.WriteToBigQuery
.
这篇关于从数据流(Python)将嵌套模式写入BigQuery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!