如何登台GCP/Apache Beam数据流模板? [英] How to I stage a GCP/Apache Beam Dataflow template?

查看:67
本文介绍了如何登台GCP/Apache Beam数据流模板?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

好吧,我必须在这里丢失一些东西.我需要将管道作为模板登台吗?当我尝试通过这些说明,它将运行该模块,但不会执行任何操作.它似乎可以正常运行,没有错误,但是我看不到任何文件实际添加到存储桶位置,请在--template_location中进行监听.我的python代码应该出现在那吗?我认为是吗?我确定我已经安装了所有Beam和Google Cloud SDK,但也许我遗漏了一些东西?暂存此数据流模板需要做什么?还可以手动将文件拖放到存储桶中并从那里运行吗?以下是我目前正在使用的模板:

Ok I have to be missing something here. What do i need to stage a pipeline as a template? When I try to stage my template with via these instructions, it runs the module but doesn't stage anything., it appears to function as expected without errors, but I don't see any files actually get added to the bucket location listen in my --template_location. Should my python code be showing up there? I assume so right? I have made sure i have all the beam and google cloud SDKs installed, but maybe I'm missing something? What do you need to do to stage this dataflow template? Also can I manually just drop the file in a bucket and run it from there? The following is the template I am currently playing with:

import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json

GC_PROJECT = 'my-proj'
BUCKET = 'test-bucket'
STAGING_BUCKET = '%s/test' % BUCKET
TEMP_BUCKET = '%s/test' % BUCKET
# RUNNER = 'DataflowRunner'
RUNNER = 'DirectRunner'

# pipeline_args = ['--save_main_session']
pipeline_args = []
pipeline_args.append('--project=%s' % GC_PROJECT)
pipeline_args.append('--runner=%s' % RUNNER)
pipeline_args.append('--staging_location=gs://%s' % STAGING_BUCKET)
pipeline_args.append('--temp_location=gs://%s' % TEMP_BUCKET)

BQ_DATASET = 'lake'
BQ_TABLE = 'whatever'

SCHEMA_OBJ = [
    {"name": "id", "type": "STRING", "description": ""},
    {"name": "value", "type": "STRING", "description": ""}
]


class ContactUploadOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--infile',
            type=str,
            help='path of input file',
            default='gs://%s/data_files/test.csv' % BUCKET)

def run(argv=None):
    print('running')
    p = beam.Pipeline(options=PipelineOptions(pipeline_args))
    lines = (p
             | beam.Create([
                {"id": "some random name", "value": "i dont know"},
                {"id": "id2", "value": "whatever man"}]))

    schema_str = '{"fields": ' + json.dumps(SCHEMA_OBJ) + '}'
    schema = parse_table_schema_from_json(schema_str)
    output_destination = '%s.%s' % (BQ_DATASET, BQ_TABLE)
    (lines
        | 'Write lines to BigQuery' >> beam.io.WriteToBigQuery(
            output_destination,
            schema=schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

    p.run().wait_until_finish()


if __name__ == '__main__':
    run(pipeline_args)


此外,如果有人可以链接一些sdk文档/资源来解释上面的登台说明应如何/为什么起作用,那真是太棒了!

Also, if someone could link some sdk documentaion/resources that explain how/why the staging instructions above are supposed to work, that would be awesome!

推荐答案

临时位置是运行作业时将加载临时文件的位置.您没有提到将在其中创建模板的"template_location".

The temp location is where the temporary files will be loaded while running the job. You have not mentioned the "template_location" where template will be created.

请参见创建模板运行模板

这篇关于如何登台GCP/Apache Beam数据流模板?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆