数据流GCS到BQ问题 [英] Dataflow GCS to BQ Problems

查看:220
本文介绍了数据流GCS到BQ问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这种情况:我在GCS中有一组文件已压缩,并且我要导入到其中的.gz文件扩展名(即000000_ [0-5] .gz)一个单独的BQ表.到目前为止,我一直在从命令行执行命令,但想通过Dataflow来完成此操作,将来可能会添加一些转换.

压缩的GCS文件中的数据是一个复杂的JSON结构,该结构经常更改架构,因此最简单的方法是将整个文件作为TSV并仅包含一个称为record的列作为TSV,然后在其中使用JSON_EXTRACT函数BQ在需要时解析出所需的值.

问题:我编写了一个Dataflow管道,它将在这种情况下进行最少的工作;从GCS读取并写入BigQuery表.但是,当我执行此管道时,出现了JSON解析错误,如下所示:

Error while reading data, error message: JSON table encountered too 
many errors, giving up. Rows: 1; errors: 1., error: Error while reading 
data, error message: JSON table encountered too many errors, giving up. 
Rows: 1; errors: 1., error: Error while reading data, error message: 
JSON parsing error in row starting at position 2630029539: Value 
encountered without start of object.

下面是我的Dataflow脚本,其中一些变量是匿名的.

from __future__ import absolute_import

import argparse
import logging
import re
import json

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.io import Read
from apache_beam.io import WriteToText
from apache_beam.io import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

def run(argv=None):

  parser = argparse.ArgumentParser()
  parser.add_argument('--input',
                      dest='input',
                      default='gs://BUCKET_NAME/input-data/000000_0.gz',
                      help='Input file to process.')
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_args.extend([
      '--runner=DataflowRunner',
      '--project=GCP_PROJECT_NAME',
      '--staging_location=gs://BUCKET_NAME/dataflow-staging',
      '--temp_location=gs://BUCKET_NAME/dataflow-temp',
      '--job_name=gcs-gzcomp-to-bq1',
  ])

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  with beam.Pipeline(options=pipeline_options) as p:

    (p | "ReadFromGCS" >> ReadFromText(known_args.input)
       | WriteToBigQuery('TABLE_NAME', dataset='DATASET_NAME',
           project='GCP_PROJECT_NAME', schema='record:string'))

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

如您所见,我尝试通过指定仅包含一个字符串类型的列的模式来完成与传统加载作业中相同的操作,但是仍然失败.

是否有一种方法可以明确告诉Dataflow有关我要如何导入GCS文件的更多详细信息?也就是说,即使每行上都是有效的JSON对象,也要指定TSV?

此外,如果此错误与我可能搞砸的其他任何事情有关,也请予以说明;我是Dataflow的超级新手,但对BQ&其他一些GCP工具,希望将其添加到我的工具栏中.

我相信WriteToBigQuery的输入集合应该是字典的集合(每个键都映射到BigQuery列),而不是字符串的集合.尝试通过| beam.Map(lambda line: dict(record=line))之类的东西.

Here's the situation: I have a set of files in GCS that are compressed and have a .gz file extension (i.e. 000000_[0-5].gz) that I am trying to import into a single BQ table. I have been executing commands from the command line to date, but wanted to accomplish this with Dataflow, potentially adding in some transformations in the future.

The data in the compressed GCS files is a complex JSON structure that frequently changes schema, so it is easiest to bring the entire file into BigQuery as a TSV with only one column, called record, and then use JSON_EXTRACT functions within BQ to parse out the values needed at the time they are needed.

Issue: I have written a Dataflow pipeline that will do the bare minimum in this scenario; read from GCS and write to a BigQuery table. When I execute this pipeline, however, I am getting a JSON parse error, shown here:

Error while reading data, error message: JSON table encountered too 
many errors, giving up. Rows: 1; errors: 1., error: Error while reading 
data, error message: JSON table encountered too many errors, giving up. 
Rows: 1; errors: 1., error: Error while reading data, error message: 
JSON parsing error in row starting at position 2630029539: Value 
encountered without start of object.

Below is my Dataflow script with some variables anonymized.

from __future__ import absolute_import

import argparse
import logging
import re
import json

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.io import Read
from apache_beam.io import WriteToText
from apache_beam.io import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

def run(argv=None):

  parser = argparse.ArgumentParser()
  parser.add_argument('--input',
                      dest='input',
                      default='gs://BUCKET_NAME/input-data/000000_0.gz',
                      help='Input file to process.')
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_args.extend([
      '--runner=DataflowRunner',
      '--project=GCP_PROJECT_NAME',
      '--staging_location=gs://BUCKET_NAME/dataflow-staging',
      '--temp_location=gs://BUCKET_NAME/dataflow-temp',
      '--job_name=gcs-gzcomp-to-bq1',
  ])

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  with beam.Pipeline(options=pipeline_options) as p:

    (p | "ReadFromGCS" >> ReadFromText(known_args.input)
       | WriteToBigQuery('TABLE_NAME', dataset='DATASET_NAME',
           project='GCP_PROJECT_NAME', schema='record:string'))

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

As you can see, I attempted to do the same thing as I am doing in the traditional load job, by specifying a schema containing only one column with a string type, but it is still failing.

Is there a way to explicitly tell Dataflow more details about how I want to import the GCS files? i.e. specifying TSV even though it is a valid JSON object on each line?

Also, if this error is related to anything else I may have screwed up, please call that out as well; I'm super new to Dataflow, but pretty experienced with BQ & some other GCP tools, so hoping to add this to my toolbelt.

解决方案

I believe the input collection to WriteToBigQuery should be a collection of dictionaries (each key maps to a BigQuery column), rather than a collection of strings. Try passing through something like | beam.Map(lambda line: dict(record=line)).

这篇关于数据流GCS到BQ问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆