数据流流式作业 - 写入 BigQuery 时出错 [英] Dataflow Streaming Job - Error writing to BigQuery

查看:23
本文介绍了数据流流式作业 - 写入 BigQuery 时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用FILE_LOADS"技术通过 Apache Beam Dataflow 作业写入 BigQuery 时遇到错误.流式插入(else 块)按预期工作正常.'FILE_LOAD'(如果块)失败并在代码后给出以下错误.存储桶上的 GCS 临时文件是有效的 JSON 对象.

Running into error writing to BigQuery through Apache Beam Dataflow job using 'FILE_LOADS' technique. The Streaming INSERT (else block) works fine and as expected. The 'FILE_LOAD' (if block) fails with error given below after the code. The temporary files on the GCS the bucket is valid JSON objects.

来自 Pub/Sub 的原始事件示例:

Sample raw events from Pub/Sub:

"{'event': 'test', 'entityId': 13615316690, 'eventTime': '2020-08-12T15:56:07.130899+00:00', 'targetEntityId': 8947793, 'targetEntityType': 'item', 'entityType': 'guest', 'properties': {}}" 
 
"{'event': 'test', 'entityId': 13615316690, 'eventTime': '2020-08-12T15:56:07.130899+00:00', 'targetEntityId': 8947793, 'targetEntityType': 'item', 'entityType': 'guest', 'properties': {‘action’: ‘delete’}}"  

from __future__ import absolute_import

import logging
import sys
import traceback
import argparse
import ast
import json
import datetime
import dateutil.parser as date_parser

import apache_beam as beam
import apache_beam.pvalue as pvalue
from google.cloud.bigquery import CreateDisposition, WriteDisposition
from apache_beam.io.gcp.bigquery_tools import RetryStrategy

def get_values(element):
    # convert properties from dict to arr of dicts to form a repeatable bq table record
    prop_list = [{'property_name': k, 'property_value': v} for k, v in element['properties'].items()]
    date_parsed = date_parser.parse(element.get('eventTime'))
    event_time = date_parsed.strftime('%Y-%m-%d %H:%M:00')
    
    raw_value = {'event': element.get('event'),
                 'entity_type': element.get('entityType'),
                 'entity_id': element.get('entityId'),
                 'target_entity_type': element.get('targetEntityType'),
                 'target_entity_id': element.get('targetEntityId'),
                 'event_time': event_time,
                 'properties': prop_list
                 }

    return raw_value

def stream_to_bq(c: dict):
    argv = [
        f'--project={c["PROJECT"]}',
        f'--runner=DataflowRunner',
        f'--job_name={c["JOBNAME"]}',
        f'--save_main_session',
        f'--staging_location=gs://{c["BUCKET_NAME"]}/{c["STAGING_LOCATION"]}',
        f'--temp_location=gs://{c["BUCKET_NAME"]}/{c["TEMP_LOCATION"]}',
        f'--network={c["NETWORKPATH"]}',
        f'--subnetwork={c["SUBNETWORKPATH"]}',
        f'--region={c["REGION"]}',
        f'--service_account_email={c["SERVICE_ACCOUNT"]}',
        # f'--setup_file=./setup.py',
        # f'--autoscaling_algorithm=THROUGHPUT_BASED',
        # f'--maxWorkers=15',
        # f'--experiments=shuffle_mode=service',
        '--no_use_public_ips',
        f'--streaming'
    ]

    if c['FILE_LOAD']:
        argv.append('--experiments=allow_non_updatable_job')
        argv.append('--experiments=use_beam_bq_sink')

    p = beam.Pipeline(argv=argv)
    valid_msgs = (p
                          | 'Read from Pubsub' >>
                          beam.io.ReadFromPubSub(subscription=c['SUBSCRIPTION']).with_output_types(bytes)
                          )

    records = (valid_msgs
               | 'Event Parser(BQ Row) ' >> beam.Map(get_values)
               )

    # Load data to BigQuery using - 'Load Jobs' or 'Streaming Insert', choice based on latency expectation.
    if c['FILE_LOAD']:
        records | 'Write Result to BQ' >> beam.io.WriteToBigQuery(c["RAW_TABLE"],
                                                                  project=c["PROJECT"],
                                                                  dataset=c["DATASET_NAME"],
                                                                  method='FILE_LOADS',
                                                                  triggering_frequency=c['FILE_LOAD_FREQUENCY'],
                                                                  create_disposition=CreateDisposition.CREATE_NEVER,
                                                                  write_disposition=WriteDisposition.WRITE_APPEND
                                                                  )

        
    else:
        records | 'Write Result to BQ' >> beam.io.WriteToBigQuery(c["RAW_TABLE"],
                                                                  project=c["PROJECT"],
                                                                  dataset=c["DATASET_NAME"],
                                                                  create_disposition=CreateDisposition.CREATE_NEVER,
                                                                  write_disposition=WriteDisposition.WRITE_APPEND,
                                                                  insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
                                                                  )

    

    p.run()

来自 Dataflow 作业的错误:

Error from the Dataflow job:

message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.' reason: 'invalid'> [while running 'generatedPtransform-1801'] java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)

推荐答案

这个问题看起来像是对 BigQuery 的错误加载.我的建议是尝试在 Dataflow 之外进行测试加载作业,以确保您的架构和数据结构正常.您可以关注此 BQ 文档.

The issue looks as a wrong load to BigQuery. My recommendation is to try to make a test load job outside Dataflow, to be sure that your schema and data structure is fine. You can follow this BQ documentation.

另外,我注意到您没有指定 schemaSCHEMA_AUTODETECT.我建议你指定它.

Also, I noticed that you are not specifying the schema nor SCHEMA_AUTODETECT. I suggest you do specify it.

要了解错误,请尝试检查 Dataflow 作业日志,其中可能包含大量信息.如果您的加载作业失败,您可以在 BigQuery 中检查这些作业,他们还会为您提供有关失败内容的更多信息.您可以使用此 StackDriver 日志来查找 BQ 加载作业 ID:

To understand the errors, try to inspect the Dataflow Jobs logs, the probably contain a lot of info. If you have Load jobs failing, you can inspect those in BigQuery, they will also get you more info about what's failing. You can use this StackDriver log to find the BQ load job IDs:

resource.type="dataflow_step"
resource.labels.job_id= < YOUR DF JOB ID >
jsonPayload.message:("Triggering job" OR "beam_load")

我非常确信这个问题是由于重复字段 properties 或架构的问题而发生的,考虑到它仅在加载作业时失败,架构似乎更有可能(也许架构为那个表是错的).无论如何,这里有一个工作管道,我在我这边测试了它并且两个 BQ 插入都有效:

I'm quite convince the issue is happening either by an issue with the repeated field properties or with the schema, considering it only fails with Load Job, schema seems more likely (maybe the schema for that table is wrong). Any how, here you have a working pipeline, I tested it on my side and both BQ inserts worked:

        schema = {
            "fields":
                [
                    {
                        "name": "name",
                        "type": "STRING"
                    },
                    {
                        "name": "repeated",
                        "type": "RECORD",
                        "mode": "REPEATED",
                        "fields": [
                            {
                                "name": "spent",
                                "type": "INTEGER"
                            },
                            {
                                "name": "ts",
                                "type": "TIMESTAMP"
                            }
                        ]
                    }
                ]
            }

        def fake_parsing(element):
            # Using a fake parse so it's easier to reproduce
            properties = []

            rnd = random.random()
            if rnd < 0.25:
                dict_prop = {"spent": random.randint(0, 100),
                             "ts": datetime.now().strftime('%Y-%m-%d %H:%M:00')}
                properties.append(dict_prop)
            elif rnd > 0.75:
                # repeated
                dict_prop = {"spent": random.randint(0, 100),
                             "ts": datetime.now().strftime('%Y-%m-%d %H:%M:00')}
                properties += [dict_prop, dict_prop]
            elif 0.5 > rnd > 0.75:
                properties.append({"ts": datetime.now().strftime('%Y-%m-%d %H:%M:00')})

            return {"name": 'inigo',
                    "repeated": properties}

        pubsub = (p | "Read Topic" >> ReadFromPubSub(topic=known_args.topic)
                    | "To Dict" >> beam.Map(fake_parsing))

        pubsub | "Stream To BQ" >> WriteToBigQuery(
            table=f"{known_args.table}_streaming_insert",
            schema=schema,
            write_disposition=BigQueryDisposition.WRITE_APPEND,
            method="STREAMING_INSERTS")

        pubsub | "Load To BQ" >> WriteToBigQuery(
            table=f"{known_args.table}_load_job",
            schema=schema,
            write_disposition=BigQueryDisposition.WRITE_APPEND,
            method=WriteToBigQuery.Method.FILE_LOADS,
            triggering_frequency=known_args.triggering,
            insert_retry_strategy="RETRY_ON_TRANSIENT_ERROR")

我建议您尝试部分管道,而不是一次尝试所有内容,即首先尝试加载作业,如果失败,请检查失败的原因(在 Dataflow 日志、BigQuery 日志或 BigQuery UI 中).完成后,添加流式插入(或其他方式).

I would recommend you to try parts of the pipeline instead of everything at once, i.e., try first just Load Jobs and, if they fail, inspect why they are failing (in Dataflow logs, BigQuery logs or BigQuery UI). Once that's done, add the Streaming inserts (or the other way around).

这篇关于数据流流式作业 - 写入 BigQuery 时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆