如何通知DataFlow作业何时完成 [英] How to notify when DataFlow Job is complete

查看:34
本文介绍了如何通知DataFlow作业何时完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在GAE上了解数据流作业何时完成.

I want to know on GAE when dataflow job is completed.

我尝试同时制作以下两个管道

I tries to make the following both pipeline

1.

 | 'write to bigquery' >> beam.io.WriteToBigQuery(...)
 | WriteStringsToPubSub('projects/fakeprj/topics/a_topic')

2.

 | 'write to bigquery' >> beam.io.WriteToBigQuery(...)
 | 'DoPubSub' >> beam.ParDo(DoPubSub())   # do Publish using google.cloud.pubsub

但是上面的两个代码都会产生以下错误:

But the both above code produces the following error:

AttributeError:"PDone"对象没有属性"windowing"

AttributeError: 'PDone' object has no attribute 'windowing'

WriteToBigquery之后如何做程序?

How to do procedure after WriteToBigquery?

注意:我通过REST使用模板执行数据流.因此,不能使用 pipeline_result.wait_until_finish().

note: I execute dataflow using template via REST. So, cannnot use pipeline_result.wait_until_finish().

编辑.

完整的堆栈在这里.

File "<myPC_DIRPATH>/webapi-dataflow/pubsubtemplate.py", line 327, in <module>
   vital_data_export()
 File "<myPC_DIRPATH>/webapi-dataflow/pubsubtemplate.py", line 323, in vital_data_export
   result = p.run()
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 382, in run
   return self.runner.run_pipeline(self)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 285, in run_pipeline
   return_context=True)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 580, in to_runner_api
   root_transform_id = context.transforms.get_id(self._root_transform())
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
   self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 810, in to_runner_api
   for part in self.parts],
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
   self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 814, in to_runner_api
   for tag, out in self.named_outputs().items()},
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 814, in <dictcomp>
   for tag, out in self.named_outputs().items()},
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
   self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pvalue.py", line 144, in to_runner_api
   self.windowing))
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pvalue.py", line 128, in windowing
   self.producer.inputs)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\transforms\ptransform.py", line 443, in get_windowing
   return inputs[0].windowing
AttributeError: 'PDone' object has no attribute 'windowing'

推荐答案

您不能

很明显,PDone是管道的最后阶段,因此不必等待完成.

It's evident that PDone is the last stage of your pipeline and applying wait for done for this is not necessary.

PInput和PDone是Apache Beam支持的类,分别指示源和接收器.如果您尝试在BigQuery写入之后执行某项操作,那么除非您连续运行两个不同的数据流作业,否则是不可能的.

PInput and PDone are classes supported by Apache Beam which indicates source and sink respectively. If you are trying to execute something after BigQuery write, it is not possible unless you run two different dataflow jobs in series.

如果您希望连续运行它们,请查看Apache Airflow.

If you are looking for running them in series, checkout Apache Airflow.

这篇关于如何通知DataFlow作业何时完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆