Google Cloud Dataflow Python,检索作业 ID [英] Google Cloud Dataflow Python, Retrieving Job ID

查看:21
本文介绍了Google Cloud Dataflow Python,检索作业 ID的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在使用 Python 处理数据流模板,我想访问作业 ID 并使用它来保存到特定的 Firestore 文档.

I am currently working on a Dataflow Template in Python, and I would like to access the Job ID and use it to save to a specific Firestore Document.

是否可以访问作业 ID?

Is it possible to access the Job ID?

我在文档中找不到与此相关的任何内容.

I cannot find anything regarding this in the documentation.

推荐答案

您可以通过从管道内调用 dataflow.projects().locations().jobs().list 来实现请参阅下面的完整代码).一种可能性是始终使用相同的作业名称调用模板,这是有道理的,否则作业前缀可以作为运行时参数传递.应用正则表达式解析作业列表以查看作业是否包含名称前缀,如果包含,则返回作业 ID.如果有多个,它只会返回最新的(当前正在运行的).

You can do so by calling dataflow.projects().locations().jobs().list from within the pipeline (see full code below). One possibility is to always invoke the template with the same job name, which would make sense, otherwise the job prefix could be passed as a runtime parameter. The list of jobs is parsed applying a regex to see if the job contains the name prefix and, if so, returns the job ID. In case there are more than one it will only return the latest one (which is the one currently running).

在定义PROJECTBUCKET 变量后,模板被暂存,使用:

The template is staged, after defining the PROJECT and BUCKET variables, with:

python script.py 
    --runner DataflowRunner 
    --project $PROJECT 
    --staging_location gs://$BUCKET/staging 
    --temp_location gs://$BUCKET/temp 
    --template_location gs://$BUCKET/templates/retrieve_job_id

然后,在执行模板化作业时指定所需的作业名称(在我的情况下为 myjobprefix):

Then, specify the desired job name (myjobprefix in my case) when executing the templated job:

gcloud dataflow jobs run myjobprefix 
   --gcs-location gs://$BUCKET/templates/retrieve_job_id

retrieve_job_id 函数将从作业中返回作业 ID,更改 job_prefix 以匹配给定的名称.

The retrieve_job_id function will return the job ID from within the job, change the job_prefix to match the name given.

import argparse, logging, re
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


def retrieve_job_id(element):
  project = 'PROJECT_ID'
  job_prefix = "myjobprefix"
  location = 'us-central1'

  logging.info("Looking for jobs with prefix {} in region {}...".format(job_prefix, location))

  try:
    credentials = GoogleCredentials.get_application_default()
    dataflow = build('dataflow', 'v1b3', credentials=credentials)

    result = dataflow.projects().locations().jobs().list(
      projectId=project,
      location=location,
    ).execute()

    job_id = "none"

    for job in result['jobs']:
      if re.findall(r'' + re.escape(job_prefix) + '', job['name']):
        job_id = job['id']
        break

    logging.info("Job ID: {}".format(job_id))
    return job_id

  except Exception as e:
    logging.info("Error retrieving Job ID")
    raise KeyError(e)


def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True

  p = beam.Pipeline(options=pipeline_options)

  init_data = (p
               | 'Start' >> beam.Create(["Init pipeline"])
               | 'Retrieve Job ID' >> beam.FlatMap(retrieve_job_id))

  p.run()


if __name__ == '__main__':
  run()

这篇关于Google Cloud Dataflow Python,检索作业 ID的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆