旁加载静态数据 [英] Sideload static data

查看:25
本文介绍了旁加载静态数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 ParDo 中处理我的数据时,我需要使用存储在 Google Cloud Storage 上的 JSON 架构.我认为这可能是侧载?我阅读了他们称为文档的页面(https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.pvalue.html),它包含一些关于 apache_beam.pvalue.AsSingletonapache_beam.pvalue.AsSideInput 但如果我在 Google 上搜索这些代码的用法,结果为零,而我找不到任何 Python 示例.

When processing my data in a ParDo I need to use a JSON schema stored on Google Cloud Storage. I think this maybe is sideloading? I read the pages they call documentation (https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.pvalue.html) and it contains something about apache_beam.pvalue.AsSingleton and apache_beam.pvalue.AsSideInput but there are zero results if I Google on the usage of those and I can't find any example for Python.

如何从 ParDo 中的存储读取文件?或者我是否在 ParDo 之前加载到我的管道,但是我如何在 ParDo 中使用第二个源?

How can I read a file from storage from within a ParDo? Or do I sideload to my Pipeline before the ParDo but how do I utilize this second source withtin the ParDo then?

[编辑]

我的主要数据来自BQ:beam.io.Read(beam.io.BigQuerySource(...
侧面输入也来自 BQ,使用相同的 BigQuerySource.

My main data comes from BQ: beam.io.Read(beam.io.BigQuerySource(...
The side input also comes from BQ, using the same BigQuerySource.

当我在主数据端输入其他数据之后添加一个步骤时,我得到了一些奇怪的错误.我注意到当我对侧输入执行 beam.Map(lambda x: x) 时,它可以工作.

When I then add a step after the main data side inputing the other data I get some strange errors. I notice that when I do beam.Map(lambda x: x) to the side input it works.

侧面输入

schema_data = (p | "read schema data" >> beam.io.Read(beam.io.BigQuerySource(query=f"select * from `{schema_table}` limit 1", use_standard_sql=True, flatten_results=True))
                         | beam.Map(lambda x: x)
                       )

主要数据

    source_data = (p | "read source data" >> beam.io.Read(beam.io.BigQuerySource(query=f"select {columns} from `{source_table}` limit 10", use_standard_sql=True, flatten_results=True)))  

组合

validated_records = source_data | 'record validation' >> beam.ParDo(Validate(), pvalue.AsList(schema_data))

推荐答案

我会使用您提到的文档作为库参考,并阅读 Beam 编程指南以获得更详细的演练:侧边输入部分.我将尝试通过几个示例提供帮助,在这些示例中,我们将从公共表下载 BigQuery 架构并将其上传到 GCS:

I would use the docs you mention as a library reference and go through the Beam programming guide for more detailed walkthroughs: side input section. I'll try to help with a couple examples in which we'll download a BigQuery schema from a public table and upload it to GCS:

bq show --schema bigquery-public-data:usa_names.usa_1910_current > schema.json
gsutil cp schema.json gs://$BUCKET

我们的数据将是一些没有标题的 csv 行,因此我们必须使用 GCS 模式:

Our data will be some csv rows without headers so that we have to use the GCS schema:

data = [('NC', 'F', 2020, 'Hello', 3200),
        ('NC', 'F', 2020, 'World', 3180)]

<小时>

使用辅助输入

我们将 JSON 文件读入 schema PCollection:


Using side inputs

We read the JSON file into a schema PCollection:

schema = (p 
  | 'Read Schema from GCS' >> ReadFromText('gs://{}/schema.json'.format(BUCKET)))

然后我们将其作为侧输入传递给 ParDo,以便将其广播给每个执行 DoFn 的工作人员.在这种情况下,我们可以使用 AsSingleton 因为我们只想将模式作为单个值提供:

and then we pass it to the ParDo as a side input so that it's broadcasted to every worker that executes the DoFn. In this case, we can use AsSingleton as we just one want to supply the schema as a single value:

(p
  | 'Create Events' >> beam.Create(data) \
  | 'Enrich with side input' >> beam.ParDo(EnrichElementsFn(), pvalue.AsSingleton(schema)) \
  | 'Log elements' >> beam.ParDo(LogElementsFn()))

现在我们可以访问EnrichElementsFnprocess方法中的schema:

Now we can access the schema in the process method of EnrichElementsFn:

class EnrichElementsFn(beam.DoFn):
  """Zips data with schema stored in GCS"""
  def process(self, element, schema):
    field_names = [x['name'] for x in json.loads(schema)]
    yield zip(field_names, element)

请注意,最好在将其保存为单例之前进行模式处理(以构造 field_names)以避免重复工作,但这只是一个说明性示例.

Note that it would be better to do the schema processing (to construct field_names) before saving it as a singleton to avoid duplicated work but this is just an illustrative example.

在这种情况下,我们不会向 ParDo 传递任何额外的输入:

In this case we don't pass any additional input to the ParDo:

(p
  | 'Create Events' >> beam.Create(data) \
  | 'Enrich with start bundle' >> beam.ParDo(EnrichElementsFn()) \
  | 'Log elements' >> beam.ParDo(LogElementsFn()))

现在我们使用 Python 客户端库(我们需要安装 google-cloud-storage)来在每次工作人员初始化包时读取架构:

And now we use the Python Client Library (we need to install google-cloud-storage) to read the schema each time that a worker initializes a bundle:

class EnrichElementsFn(beam.DoFn):
  """Zips data with schema stored in GCS"""
  def start_bundle(self):
    from google.cloud import storage

    client = storage.Client()
    blob = client.get_bucket(BUCKET).get_blob('schema.json')
    self.schema = blob.download_as_string()

  def process(self, element):
    field_names = [x['name'] for x in json.loads(self.schema)]
    yield zip(field_names, element)

<小时>

两种情况的输出相同:


The output is the same in both cases:

INFO:root:[(u'state', 'NC'), (u'gender', 'F'), (u'year', 2020), (u'name', 'Hello'), (u'number', 3200)]
INFO:root:[(u'state', 'NC'), (u'gender', 'F'), (u'year', 2020), (u'name', 'World'), (u'number', 3180)]

使用 2.16.0 SDK 和 DirectRunner 进行测试.

Tested with 2.16.0 SDK and the DirectRunner.

两个示例的完整代码此处.

这篇关于旁加载静态数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆