从扳手读取的谷歌数据流 [英] google dataflow read from spanner

查看:74
本文介绍了从扳手读取的谷歌数据流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Google数据流和python sdk从Google扳手数据库中读取一个表,并将其写入文本文件以进行备份. 我编写了以下脚本:

I am trying to read a table from a Google spanner database, and write it to a text file to do a backup, using google dataflow with the python sdk. I have written the following script:

    from __future__ import absolute_import

import argparse
import itertools
import logging
import re
import time
import datetime as dt
import logging

import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.io import WriteToText
from apache_beam.io.range_trackers import OffsetRangeTracker, UnsplittableRangeTracker
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions, SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions

from google.cloud.spanner.client import Client
from google.cloud.spanner.keyset import KeySet

BUCKET_URL = 'gs://my_bucket'
OUTPUT = '%s/output/' % BUCKET_URL
PROJECT_ID = 'my_project'
INSTANCE_ID = 'my_instance'
DATABASE_ID = 'my_db'
JOB_NAME = 'spanner-backup'
TABLE = 'my_table'


class SpannerSource(iobase.BoundedSource):
    def __init__(self):
    logging.info('Enter __init__')

    self.spannerOptions = {
        "id": PROJECT_ID,
        "instance": INSTANCE_ID,
        "database": DATABASE_ID
    }
    self.SpannerClient = Client

    def estimate_size(self):
    logging.info('Enter estimate_size')
    return 1

    def get_range_tracker(self, start_position=None, stop_position=None):
    logging.info('Enter get_range_tracker')
    if start_position is None:
       start_position = 0
    if stop_position is None:
       stop_position = OffsetRangeTracker.OFFSET_INFINITY

    range_tracker = OffsetRangeTracker(start_position, stop_position)
    return UnsplittableRangeTracker(range_tracker)

    def read(self, range_tracker):  # This is not called when using the dataflowRunner !
    logging.info('Enter read')
    # instantiate spanner client
    spanner_client = self.SpannerClient(self.spannerOptions["id"])
    instance = spanner_client.instance(self.spannerOptions["instance"])
    database = instance.database(self.spannerOptions["database"])

    # read from table
    table_fields = database.execute_sql("SELECT t.column_name FROM information_schema.columns AS t WHERE t.table_name = '%s'" % TABLE)
    table_fields.consume_all()
    self.columns = [x[0] for x in table_fields]
    keyset = KeySet(all_=True)
    results = database.read(table=TABLE, columns=self.columns, keyset=keyset)

    # iterator over rows
    results.consume_all()
    for row in results:
        JSON_row = {
        self.columns[i]: row[i] for i in range(len(self.columns))
        }
        yield JSON_row

    def split(self, start_position=None, stop_position=None):
    # this should not be called since the source is unspittable
    logging.info('Enter split')
    if start_position is None:
        start_position = 0
    if stop_position is None:
        stop_position = 1

    # Because the source is unsplittable (for now), only a single source is returned
    yield iobase.SourceBundle(
        weight=1,
        source=self,
        start_position=start_position,
        stop_position=stop_position)


def run(argv=None):
  """Main entry point"""
  pipeline_options = PipelineOptions()
  google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
  google_cloud_options.project = PROJECT_ID
  google_cloud_options.job_name = JOB_NAME
  google_cloud_options.staging_location = '%s/staging' % BUCKET_URL
  google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL

  #pipeline_options.view_as(StandardOptions).runner = 'DirectRunner'
  pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
  p = beam.Pipeline(options=pipeline_options)

  output = p | 'Get Rows from Spanner' >> beam.io.Read(SpannerSource())
  iso_datetime = dt.datetime.now().replace(microsecond=0).isoformat()
  output | 'Store in GCS' >> WriteToText(file_path_prefix=OUTPUT + iso_datetime + '-' + TABLE, file_name_suffix='')  # if this line is commented, job completes but does not do anything


  result = p.run()
  result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

但是,此脚本仅在DirectRunner上正确运行:当我让它在DataflowRunner上运行时,它会运行一段时间而没有任何输出,然后退出并显示错误:

However, this script runs correctly only on the DirectRunner: when I let it run on the DataflowRunner, it runs for a while without any output, before exiting with an error:

执行失败步骤失败14 [...]工作流程失败.原因:[...]工作人员失去了与服务的联系."

"Executing failure step failure14 [...] Workflow failed. Causes: [...] The worker lost contact with the service."

有时候,它会一直持续下去,而不会创建输出.

Sometimes, it just goes on forever, without creating an output.

此外,如果我注释'output = ...'行,则作业完成,但没有实际读取数据.

Moreover, if I comment the line 'output = ...', the job completes, but without actually reading the data.

还似乎dataflowRunner调用了源的函数"estimate_size",而不是调用了"read"或"get_range_tracker"函数.

It also appears that the dataflowRunner calls the function 'estimate_size' of the source, but not the functions 'read' or 'get_range_tracker'.

有人对导致这种情况的原因有任何想法吗? 我知道有一个(更完整的)java SDK,提供了一个实验性的扳手源/接收器,但如果可能的话,我宁愿使用python.

Does anyone have any ideas about what may cause this ? I know there is a (more complete) java SDK with an experimental spanner source/sink available, but if possible I'd rather stick with python.

谢谢

推荐答案

Google当前添加了对Backup Spanner和Dataflow的支持,您可以在创建DataFlow作业时选择相关模板.

Google currently added support of Backup Spanner with Dataflow, you can choose related template when creating DataFlow job.

有关更多信息: 查看全文

登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆