Google Cloud Dataflow Python,检索作业 ID [英] Google Cloud Dataflow Python, Retrieving Job ID
问题描述
我目前正在使用 Python 处理数据流模板,我想访问作业 ID 并使用它来保存到特定的 Firestore 文档.
I am currently working on a Dataflow Template in Python, and I would like to access the Job ID and use it to save to a specific Firestore Document.
是否可以访问作业 ID?
Is it possible to access the Job ID?
我在文档中找不到与此相关的任何内容.
I cannot find anything regarding this in the documentation.
推荐答案
您可以通过从管道内调用 dataflow.projects().locations().jobs().list
来实现请参阅下面的完整代码).一种可能性是始终使用相同的作业名称调用模板,这是有道理的,否则作业前缀可以作为运行时参数传递.应用正则表达式解析作业列表以查看作业是否包含名称前缀,如果包含,则返回作业 ID.如果有多个,它只会返回最新的(当前正在运行的).
You can do so by calling dataflow.projects().locations().jobs().list
from within the pipeline (see full code below). One possibility is to always invoke the template with the same job name, which would make sense, otherwise the job prefix could be passed as a runtime parameter. The list of jobs is parsed applying a regex to see if the job contains the name prefix and, if so, returns the job ID. In case there are more than one it will only return the latest one (which is the one currently running).
在定义PROJECT
和BUCKET
变量后,模板被暂存,使用:
The template is staged, after defining the PROJECT
and BUCKET
variables, with:
python script.py
--runner DataflowRunner
--project $PROJECT
--staging_location gs://$BUCKET/staging
--temp_location gs://$BUCKET/temp
--template_location gs://$BUCKET/templates/retrieve_job_id
然后,在执行模板化作业时指定所需的作业名称(在我的情况下为 myjobprefix
):
Then, specify the desired job name (myjobprefix
in my case) when executing the templated job:
gcloud dataflow jobs run myjobprefix
--gcs-location gs://$BUCKET/templates/retrieve_job_id
retrieve_job_id
函数将从作业中返回作业 ID,更改 job_prefix
以匹配给定的名称.
The retrieve_job_id
function will return the job ID from within the job, change the job_prefix
to match the name given.
import argparse, logging, re
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def retrieve_job_id(element):
project = 'PROJECT_ID'
job_prefix = "myjobprefix"
location = 'us-central1'
logging.info("Looking for jobs with prefix {} in region {}...".format(job_prefix, location))
try:
credentials = GoogleCredentials.get_application_default()
dataflow = build('dataflow', 'v1b3', credentials=credentials)
result = dataflow.projects().locations().jobs().list(
projectId=project,
location=location,
).execute()
job_id = "none"
for job in result['jobs']:
if re.findall(r'' + re.escape(job_prefix) + '', job['name']):
job_id = job['id']
break
logging.info("Job ID: {}".format(job_id))
return job_id
except Exception as e:
logging.info("Error retrieving Job ID")
raise KeyError(e)
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
init_data = (p
| 'Start' >> beam.Create(["Init pipeline"])
| 'Retrieve Job ID' >> beam.FlatMap(retrieve_job_id))
p.run()
if __name__ == '__main__':
run()
这篇关于Google Cloud Dataflow Python,检索作业 ID的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!