如何使用云功能触发数据流?(Python SDK) [英] How to trigger a dataflow with a cloud function? (Python SDK)

本文介绍了如何使用云功能触发数据流?(Python SDK)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个由云发布/订阅触发的云功能.我想要使​​用 Python SDK 的相同函数触发数据流.这是我的代码:

I have a cloud function that is triggered by cloud Pub/Sub. I want the same function trigger dataflow using Python SDK. Here is my code:

import base64
def hello_pubsub(event, context):   
    if 'data' in event:
        message = base64.b64decode(event['data']).decode('utf-8')
    else:
        message = 'hello world!'
    print('Message of pubsub : {}'.format(message))

我是这样部署函数的:

gcloud beta functions deploy hello_pubsub  --runtime python37 --trigger-topic topic1

推荐答案

您必须将管道 Python 代码嵌入到您的函数中.当您的函数被调用时,您只需调用管道 python main 函数,该函数在您的文件中执行管道.

You have to embed your pipeline python code with your function. When your function is called, you simply call the pipeline python main function which executes the pipeline in your file.

如果您在 Cloud Shell 中开发并试用了管道,并且已经在 Dataflow 管道中运行它,则您的代码应具有以下结构:

If you developed and tried your pipeline in Cloud Shell and you already ran it in Dataflow pipeline, your code should have this structure:

def run(argv=None, save_main_session=True):
  # Parse argument
  # Set options
  # Start Pipeline in p variable
  # Perform your transform in Pipeline
  # Run your Pipeline
  result = p.run()
  # Wait the end of the pipeline
  result.wait_until_finish()

因此,使用正确的参数调用此函数,尤其是 runner=DataflowRunner 以允许 python 代码在 Dataflow 服务中加载管道.

Thus, call this function with the correct argument especially the runner=DataflowRunner to allow the python code to load the pipeline in Dataflow service.

最后删除 result.wait_until_finish() 因为你的函数不会在所有数据流过程中存活很长时间.

Delete at the end the result.wait_until_finish() because your function won't live all the dataflow process long.

如果需要,您也可以使用模板.

You can also use template if you want.

这篇关于如何使用云功能触发数据流?(Python SDK)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆