如何使用云功能触发数据流? (Python SDK) [英] How to trigger a dataflow with a cloud function? (Python SDK)

本文介绍了如何使用云功能触发数据流? (Python SDK)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我具有由云发布/订阅触发的云功能.我想要使​​用Python SDK的相同函数触发数据流.这是我的代码:

I have a cloud function that is triggered by cloud Pub/Sub. I want the same function trigger dataflow using Python SDK. Here is my code:

import base64
def hello_pubsub(event, context):   
    if 'data' in event:
        message = base64.b64decode(event['data']).decode('utf-8')
    else:
        message = 'hello world!'
    print('Message of pubsub : {}'.format(message))

我以这种方式部署函数:

I deploy the function this way:

gcloud beta functions deploy hello_pubsub  --runtime python37 --trigger-topic topic1

推荐答案

您必须在函数中嵌入管道python代码.调用函数时,您只需调用管道python主函数即可执行文件中的管道.

You have to embed your pipeline python code with your function. When your function is called, you simply call the pipeline python main function which executes the pipeline in your file.

如果您在Cloud Shell中开发并尝试了管道,并且已经在Dataflow管道中运行过,则您的代码应具有以下结构:

If you developed and tried your pipeline in Cloud Shell and you already ran it in Dataflow pipeline, your code should have this structure:

def run(argv=None, save_main_session=True):
  # Parse argument
  # Set options
  # Start Pipeline in p variable
  # Perform your transform in Pipeline
  # Run your Pipeline
  result = p.run()
  # Wait the end of the pipeline
  result.wait_until_finish()

因此,请使用正确的参数调用此函数,尤其是runner = DataflowRunner,以允许python代码在Dataflow服务中加载管道.

Thus, call this function with the correct argument especially the runner=DataflowRunner to allow the python code to load the pipeline in Dataflow service.

result.wait_until_finish()末尾删除,因为您的函数不会长时间保存所有数据流过程.

Delete at the end the result.wait_until_finish() because your function won't live all the dataflow process long.

如果需要,您也可以使用模板.

You can also use template if you want.

这篇关于如何使用云功能触发数据流? (Python SDK)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆