从扳手读取的谷歌数据流 [英] google dataflow read from spanner
问题描述
我正在尝试从 Google spanner 数据库中读取一个表,并将其写入文本文件以进行备份,使用带有 python sdk 的 google 数据流.我编写了以下脚本:
I am trying to read a table from a Google spanner database, and write it to a text file to do a backup, using google dataflow with the python sdk. I have written the following script:
from __future__ import absolute_import
import argparse
import itertools
import logging
import re
import time
import datetime as dt
import logging
import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.io import WriteToText
from apache_beam.io.range_trackers import OffsetRangeTracker, UnsplittableRangeTracker
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions, SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from google.cloud.spanner.client import Client
from google.cloud.spanner.keyset import KeySet
BUCKET_URL = 'gs://my_bucket'
OUTPUT = '%s/output/' % BUCKET_URL
PROJECT_ID = 'my_project'
INSTANCE_ID = 'my_instance'
DATABASE_ID = 'my_db'
JOB_NAME = 'spanner-backup'
TABLE = 'my_table'
class SpannerSource(iobase.BoundedSource):
def __init__(self):
logging.info('Enter __init__')
self.spannerOptions = {
"id": PROJECT_ID,
"instance": INSTANCE_ID,
"database": DATABASE_ID
}
self.SpannerClient = Client
def estimate_size(self):
logging.info('Enter estimate_size')
return 1
def get_range_tracker(self, start_position=None, stop_position=None):
logging.info('Enter get_range_tracker')
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = OffsetRangeTracker.OFFSET_INFINITY
range_tracker = OffsetRangeTracker(start_position, stop_position)
return UnsplittableRangeTracker(range_tracker)
def read(self, range_tracker): # This is not called when using the dataflowRunner !
logging.info('Enter read')
# instantiate spanner client
spanner_client = self.SpannerClient(self.spannerOptions["id"])
instance = spanner_client.instance(self.spannerOptions["instance"])
database = instance.database(self.spannerOptions["database"])
# read from table
table_fields = database.execute_sql("SELECT t.column_name FROM information_schema.columns AS t WHERE t.table_name = '%s'" % TABLE)
table_fields.consume_all()
self.columns = [x[0] for x in table_fields]
keyset = KeySet(all_=True)
results = database.read(table=TABLE, columns=self.columns, keyset=keyset)
# iterator over rows
results.consume_all()
for row in results:
JSON_row = {
self.columns[i]: row[i] for i in range(len(self.columns))
}
yield JSON_row
def split(self, start_position=None, stop_position=None):
# this should not be called since the source is unspittable
logging.info('Enter split')
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = 1
# Because the source is unsplittable (for now), only a single source is returned
yield iobase.SourceBundle(
weight=1,
source=self,
start_position=start_position,
stop_position=stop_position)
def run(argv=None):
"""Main entry point"""
pipeline_options = PipelineOptions()
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = JOB_NAME
google_cloud_options.staging_location = '%s/staging' % BUCKET_URL
google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL
#pipeline_options.view_as(StandardOptions).runner = 'DirectRunner'
pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
p = beam.Pipeline(options=pipeline_options)
output = p | 'Get Rows from Spanner' >> beam.io.Read(SpannerSource())
iso_datetime = dt.datetime.now().replace(microsecond=0).isoformat()
output | 'Store in GCS' >> WriteToText(file_path_prefix=OUTPUT + iso_datetime + '-' + TABLE, file_name_suffix='') # if this line is commented, job completes but does not do anything
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
然而,这个脚本只能在 DirectRunner 上正确运行:当我让它在 DataflowRunner 上运行时,它在没有任何输出的情况下运行了一段时间,然后退出并出现错误:
However, this script runs correctly only on the DirectRunner: when I let it run on the DataflowRunner, it runs for a while without any output, before exiting with an error:
执行失败步骤失败14 [...] 工作流程失败.原因:[...] 工作人员与服务失去联系."
"Executing failure step failure14 [...] Workflow failed. Causes: [...] The worker lost contact with the service."
有时,它会一直持续下去,而不会创建输出.
Sometimes, it just goes on forever, without creating an output.
此外,如果我对output = ..."行进行注释,则作业完成,但并未实际读取数据.
Moreover, if I comment the line 'output = ...', the job completes, but without actually reading the data.
dataflowRunner 似乎还调用了源的函数estimate_size",而不是函数read"或get_range_tracker".
It also appears that the dataflowRunner calls the function 'estimate_size' of the source, but not the functions 'read' or 'get_range_tracker'.
有没有人对可能导致这种情况的原因有任何想法?我知道有一个(更完整的)java SDK 和一个实验性的扳手源/接收器可用,但如果可能的话,我宁愿坚持使用 python.
Does anyone have any ideas about what may cause this ? I know there is a (more complete) java SDK with an experimental spanner source/sink available, but if possible I'd rather stick with python.
谢谢
推荐答案
Google 目前添加了对 Backup Spanner with Dataflow 的支持,您可以在创建 DataFlow 作业时选择相关模板.
Google currently added support of Backup Spanner with Dataflow, you can choose related template when creating DataFlow job.
这篇关于从扳手读取的谷歌数据流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!