数据流流作业-写入BigQuery时出错 [英] Dataflow Streaming Job - Error writing to BigQuery

查看:72
本文介绍了数据流流作业-写入BigQuery时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用"FILE_LOADS"技术通过Apache Beam Dataflow作业写入BigQuery时遇到错误. Streaming INSERT(其他块)可以正常运行,并且符合预期. "FILE_LOAD"(如果是块)失败,并在代码后给出以下错误.存储桶中GCS上的临时文件是有效的JSON对象.

Running into error writing to BigQuery through Apache Beam Dataflow job using 'FILE_LOADS' technique. The Streaming INSERT (else block) works fine and as expected. The 'FILE_LOAD' (if block) fails with error given below after the code. The temporary files on the GCS the bucket is valid JSON objects.

示例来自发布/订阅的原始事件:

Sample raw events from Pub/Sub:

"{'event': 'test', 'entityId': 13615316690, 'eventTime': '2020-08-12T15:56:07.130899+00:00', 'targetEntityId': 8947793, 'targetEntityType': 'item', 'entityType': 'guest', 'properties': {}}" 
 
"{'event': 'test', 'entityId': 13615316690, 'eventTime': '2020-08-12T15:56:07.130899+00:00', 'targetEntityId': 8947793, 'targetEntityType': 'item', 'entityType': 'guest', 'properties': {‘action’: ‘delete’}}"  

from __future__ import absolute_import

import logging
import sys
import traceback
import argparse
import ast
import json
import datetime
import dateutil.parser as date_parser

import apache_beam as beam
import apache_beam.pvalue as pvalue
from google.cloud.bigquery import CreateDisposition, WriteDisposition
from apache_beam.io.gcp.bigquery_tools import RetryStrategy

def get_values(element):
    # convert properties from dict to arr of dicts to form a repeatable bq table record
    prop_list = [{'property_name': k, 'property_value': v} for k, v in element['properties'].items()]
    date_parsed = date_parser.parse(element.get('eventTime'))
    event_time = date_parsed.strftime('%Y-%m-%d %H:%M:00')
    
    raw_value = {'event': element.get('event'),
                 'entity_type': element.get('entityType'),
                 'entity_id': element.get('entityId'),
                 'target_entity_type': element.get('targetEntityType'),
                 'target_entity_id': element.get('targetEntityId'),
                 'event_time': event_time,
                 'properties': prop_list
                 }

    return raw_value

def stream_to_bq(c: dict):
    argv = [
        f'--project={c["PROJECT"]}',
        f'--runner=DataflowRunner',
        f'--job_name={c["JOBNAME"]}',
        f'--save_main_session',
        f'--staging_location=gs://{c["BUCKET_NAME"]}/{c["STAGING_LOCATION"]}',
        f'--temp_location=gs://{c["BUCKET_NAME"]}/{c["TEMP_LOCATION"]}',
        f'--network={c["NETWORKPATH"]}',
        f'--subnetwork={c["SUBNETWORKPATH"]}',
        f'--region={c["REGION"]}',
        f'--service_account_email={c["SERVICE_ACCOUNT"]}',
        # f'--setup_file=./setup.py',
        # f'--autoscaling_algorithm=THROUGHPUT_BASED',
        # f'--maxWorkers=15',
        # f'--experiments=shuffle_mode=service',
        '--no_use_public_ips',
        f'--streaming'
    ]

    if c['FILE_LOAD']:
        argv.append('--experiments=allow_non_updatable_job')
        argv.append('--experiments=use_beam_bq_sink')

    p = beam.Pipeline(argv=argv)
    valid_msgs = (p
                          | 'Read from Pubsub' >>
                          beam.io.ReadFromPubSub(subscription=c['SUBSCRIPTION']).with_output_types(bytes)
                          )

    records = (valid_msgs
               | 'Event Parser(BQ Row) ' >> beam.Map(get_values)
               )

    # Load data to BigQuery using - 'Load Jobs' or 'Streaming Insert', choice based on latency expectation.
    if c['FILE_LOAD']:
        records | 'Write Result to BQ' >> beam.io.WriteToBigQuery(c["RAW_TABLE"],
                                                                  project=c["PROJECT"],
                                                                  dataset=c["DATASET_NAME"],
                                                                  method='FILE_LOADS',
                                                                  triggering_frequency=c['FILE_LOAD_FREQUENCY'],
                                                                  create_disposition=CreateDisposition.CREATE_NEVER,
                                                                  write_disposition=WriteDisposition.WRITE_APPEND
                                                                  )

        
    else:
        records | 'Write Result to BQ' >> beam.io.WriteToBigQuery(c["RAW_TABLE"],
                                                                  project=c["PROJECT"],
                                                                  dataset=c["DATASET_NAME"],
                                                                  create_disposition=CreateDisposition.CREATE_NEVER,
                                                                  write_disposition=WriteDisposition.WRITE_APPEND,
                                                                  insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
                                                                  )

    

    p.run()

数据流作业出错:

message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.' reason: 'invalid'> [while running 'generatedPtransform-1801'] java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)

推荐答案

该问题似乎是BigQuery的错误加载.我的建议是尝试在Dataflow之外进行测试加载作业,以确保您的架构和数据结构正确.您可以遵循此BQ文档.

The issue looks as a wrong load to BigQuery. My recommendation is to try to make a test load job outside Dataflow, to be sure that your schema and data structure is fine. You can follow this BQ documentation.

此外,我注意到您未指定schemaSCHEMA_AUTODETECT.我建议您指定它.

Also, I noticed that you are not specifying the schema nor SCHEMA_AUTODETECT. I suggest you do specify it.

要了解这些错误,请尝试检查数据流作业"日志,其中可能包含很多信息.如果您的加载作业失败,则可以检查BigQuery中的那些作业,它们还将为您提供有关失败原因的更多信息.您可以使用此StackDriver日志查找BQ加载作业ID:

To understand the errors, try to inspect the Dataflow Jobs logs, the probably contain a lot of info. If you have Load jobs failing, you can inspect those in BigQuery, they will also get you more info about what's failing. You can use this StackDriver log to find the BQ load job IDs:

resource.type="dataflow_step"
resource.labels.job_id= < YOUR DF JOB ID >
jsonPayload.message:("Triggering job" OR "beam_load")

我非常确信问题是由重复字段properties或架构引起的,考虑到它仅在Load Job中失败,架构似乎更有可能(该表的架构错误) .无论如何,在这里您有一个有效的管道,我在一边进行了测试,并且两个BQ插入件均有效:

I'm quite convince the issue is happening either by an issue with the repeated field properties or with the schema, considering it only fails with Load Job, schema seems more likely (maybe the schema for that table is wrong). Any how, here you have a working pipeline, I tested it on my side and both BQ inserts worked:

        schema = {
            "fields":
                [
                    {
                        "name": "name",
                        "type": "STRING"
                    },
                    {
                        "name": "repeated",
                        "type": "RECORD",
                        "mode": "REPEATED",
                        "fields": [
                            {
                                "name": "spent",
                                "type": "INTEGER"
                            },
                            {
                                "name": "ts",
                                "type": "TIMESTAMP"
                            }
                        ]
                    }
                ]
            }

        def fake_parsing(element):
            # Using a fake parse so it's easier to reproduce
            properties = []

            rnd = random.random()
            if rnd < 0.25:
                dict_prop = {"spent": random.randint(0, 100),
                             "ts": datetime.now().strftime('%Y-%m-%d %H:%M:00')}
                properties.append(dict_prop)
            elif rnd > 0.75:
                # repeated
                dict_prop = {"spent": random.randint(0, 100),
                             "ts": datetime.now().strftime('%Y-%m-%d %H:%M:00')}
                properties += [dict_prop, dict_prop]
            elif 0.5 > rnd > 0.75:
                properties.append({"ts": datetime.now().strftime('%Y-%m-%d %H:%M:00')})

            return {"name": 'inigo',
                    "repeated": properties}

        pubsub = (p | "Read Topic" >> ReadFromPubSub(topic=known_args.topic)
                    | "To Dict" >> beam.Map(fake_parsing))

        pubsub | "Stream To BQ" >> WriteToBigQuery(
            table=f"{known_args.table}_streaming_insert",
            schema=schema,
            write_disposition=BigQueryDisposition.WRITE_APPEND,
            method="STREAMING_INSERTS")

        pubsub | "Load To BQ" >> WriteToBigQuery(
            table=f"{known_args.table}_load_job",
            schema=schema,
            write_disposition=BigQueryDisposition.WRITE_APPEND,
            method=WriteToBigQuery.Method.FILE_LOADS,
            triggering_frequency=known_args.triggering,
            insert_retry_strategy="RETRY_ON_TRANSIENT_ERROR")

我建议您尝试部分流水线而不是一次尝试所有流水线,即首先尝试仅加载作业,如果它们失败,请检查它们失败的原因(在数据流日志,BigQuery日志或BigQuery UI中).完成后,添加Streaming插入(或相反).

I would recommend you to try parts of the pipeline instead of everything at once, i.e., try first just Load Jobs and, if they fail, inspect why they are failing (in Dataflow logs, BigQuery logs or BigQuery UI). Once that's done, add the Streaming inserts (or the other way around).

这篇关于数据流流作业-写入BigQuery时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆