从数据流(Python)将嵌套模式写入BigQuery [英] Writing nested schema to BigQuery from Dataflow (Python)

查看:68
本文介绍了从数据流(Python)将嵌套模式写入BigQuery的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Dataflow作业要写入BigQuery.它适用于非嵌套模式,但是不适用于嵌套模式.

I have a Dataflow job to write to BigQuery. It works well for non-nested schema, however fails for the nested schema.

这是我的数据流管道:

pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions)

  schema = 'url: STRING,' \
           'ua: STRING,' \
           'method: STRING,' \
           'man: RECORD,' \
           'man.ip: RECORD,' \
           'man.ip.cc: STRING,' \
           'man.ip.city: STRING,' \
           'man.ip.as: INTEGER,' \
           'man.ip.country: STRING,' \
           'man.res: RECORD,' \
           'man.res.ip_dom: STRING'

  first = p | 'read' >> ReadFromText(wordcount_options.input)
  second = (first
            | 'process' >> (beam.ParDo(processFunction()))
            | 'write' >> beam.io.WriteToBigQuery(
              'myBucket:tableFolder.test_table',
              schema=schema)
  )

我使用以下架构创建了BigQuery表:

I created BigQuery Table using the following Schema is:

[
  {
    "mode": "NULLABLE",
    "name": "url",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "ua",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "method",
    "type": "STRING"
  },
  {
    "mode": "REPEATED",
    "name": "man",
    "type": "RECORD",
    "fields":
      [
        {
          "mode": "REPEATED",
          "name": "ip",
          "type": "RECORD",
          "fields":
            [
              {
                "mode": "NULLABLE",
                "name": "cc",
                "type": "STRING"
              },
              {
                "mode": "NULLABLE",
                "name": "city",
                "type": "STRING"
              },
              {
                "mode": "NULLABLE",
                "name": "as",
                "type": "INTEGER"
              },
              {
                "mode": "NULLABLE",
                "name": "country",
                "type": "STRING"
              }
            ]
        },
        {
          "mode": "REPEATED",
          "name": "res",
          "type": "RECORD",
          "fields":
            [
              {
                "mode": "NULLABLE",
                "name": "ip_dom",
                "type": "STRING"
              }
            ]
        }
      ]
  }
]

我遇到以下错误:

BigQuery creation of import job for table "test_table" in dataset "tableFolder" in project "myBucket" failed., BigQuery execution failed., HTTP transport error:
 Message: Invalid value for: url is not a valid value
 HTTP Code: 400

问题有人可以指导我吗?我究竟做错了什么?另外,如果有更好的方法遍历所有嵌套模式并写入BigQuery,请提出建议?

Question Can someone please guide me? What am I doing wrong? Also, If there is a better way to iterate through all the nested schema and write to BigQuery please suggest?

其他信息我的数据文件:

{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"IN","city":"delhi","as":274,"country":"States"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"DK","city":"munlan","as":4865,"country":"United"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"GET","man":{"ip":{"cc":"BS","city":"sind","as":7655,"country":"India"},"res":{"ip_dom":"v1"}}}

推荐答案

代码的问题是,您在指定 BigQuery表架构 时尝试使用嵌套字段 为字符串不受支持.为了将嵌套记录从Ap​​ache Beam推送到BigQuery中,您需要创建 TableSchema 对象,即使用内置解析器:

The problem with your code is that you try to use nested fields while specifying BigQuery Table Schema as string, which is not supported. In order to push nested records into BigQuery from Apache Beam you need to create TableSchema object, i.e using built-in parser:

from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
table_schema = parse_table_schema_from_json(your_bigquery_json_schema)

您需要在此处将模式作为JSON字符串传递,您可以在终端中使用以下命令来获取它(我假设您已安装 gcloud工具):

You need to pass schema as JSON string there, you can obtain it using the following command in your terminal (I assume that you have gcloud tools installed):

bq --project=your-gcp-project-name --format=json show your.table.name > schema.json

,在Python中按如下方式使用它:

and in Python use it as follows:

table_schema = parse_table_schema_from_json(json.dumps(json.load(open("schema.json"))["schema"]))

然后在您的管道中:

 beam.io.WriteToBigQuery(
              'myBucket:tableFolder.test_table',
              schema=table_schema)

您还可以看一下显示手动创建 TableSchema 对象的示例: https://github.com/apache/beam/blob/474345f5987e47a22d063c7bfcb3638c85a57e64/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py

You can also take a look at the example showing manual creation of TableSchema object: https://github.com/apache/beam/blob/474345f5987e47a22d063c7bfcb3638c85a57e64/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py

(来自链接的示例):

from apache_beam.io.gcp.internal.clients import bigquery
table_schema = bigquery.TableSchema()
full_name_schema = bigquery.TableFieldSchema()
full_name_schema.name = 'fullName'
full_name_schema.type = 'string'
full_name_schema.mode = 'required'
table_schema.fields.append(full_name_schema)

# A nested field
phone_number_schema = bigquery.TableFieldSchema()
phone_number_schema.name = 'phoneNumber'
phone_number_schema.type = 'record'
phone_number_schema.mode = 'nullable'
number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
phone_number_schema.fields.append(number)

table_schema.fields.append(phone_number_schema)
area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
phone_number_schema.fields.append(area_code)
table_schema.fields.append(phone_number_schema)

然后只需在 beam.io.WriteToBigQuery 中使用 table_schema 变量.

then just use table_schema variable in beam.io.WriteToBigQuery.

这篇关于从数据流(Python)将嵌套模式写入BigQuery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆