从扳手读取的谷歌数据流 [英] google dataflow read from spanner

查看:25
本文介绍了从扳手读取的谷歌数据流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从 Google spanner 数据库中读取一个表,并将其写入文本文件以进行备份,使用带有 python sdk 的 google 数据流.我编写了以下脚本:

I am trying to read a table from a Google spanner database, and write it to a text file to do a backup, using google dataflow with the python sdk. I have written the following script:

    from __future__ import absolute_import

import argparse
import itertools
import logging
import re
import time
import datetime as dt
import logging

import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.io import WriteToText
from apache_beam.io.range_trackers import OffsetRangeTracker, UnsplittableRangeTracker
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions, SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions

from google.cloud.spanner.client import Client
from google.cloud.spanner.keyset import KeySet

BUCKET_URL = 'gs://my_bucket'
OUTPUT = '%s/output/' % BUCKET_URL
PROJECT_ID = 'my_project'
INSTANCE_ID = 'my_instance'
DATABASE_ID = 'my_db'
JOB_NAME = 'spanner-backup'
TABLE = 'my_table'


class SpannerSource(iobase.BoundedSource):
    def __init__(self):
    logging.info('Enter __init__')

    self.spannerOptions = {
        "id": PROJECT_ID,
        "instance": INSTANCE_ID,
        "database": DATABASE_ID
    }
    self.SpannerClient = Client

    def estimate_size(self):
    logging.info('Enter estimate_size')
    return 1

    def get_range_tracker(self, start_position=None, stop_position=None):
    logging.info('Enter get_range_tracker')
    if start_position is None:
       start_position = 0
    if stop_position is None:
       stop_position = OffsetRangeTracker.OFFSET_INFINITY

    range_tracker = OffsetRangeTracker(start_position, stop_position)
    return UnsplittableRangeTracker(range_tracker)

    def read(self, range_tracker):  # This is not called when using the dataflowRunner !
    logging.info('Enter read')
    # instantiate spanner client
    spanner_client = self.SpannerClient(self.spannerOptions["id"])
    instance = spanner_client.instance(self.spannerOptions["instance"])
    database = instance.database(self.spannerOptions["database"])

    # read from table
    table_fields = database.execute_sql("SELECT t.column_name FROM information_schema.columns AS t WHERE t.table_name = '%s'" % TABLE)
    table_fields.consume_all()
    self.columns = [x[0] for x in table_fields]
    keyset = KeySet(all_=True)
    results = database.read(table=TABLE, columns=self.columns, keyset=keyset)

    # iterator over rows
    results.consume_all()
    for row in results:
        JSON_row = {
        self.columns[i]: row[i] for i in range(len(self.columns))
        }
        yield JSON_row

    def split(self, start_position=None, stop_position=None):
    # this should not be called since the source is unspittable
    logging.info('Enter split')
    if start_position is None:
        start_position = 0
    if stop_position is None:
        stop_position = 1

    # Because the source is unsplittable (for now), only a single source is returned
    yield iobase.SourceBundle(
        weight=1,
        source=self,
        start_position=start_position,
        stop_position=stop_position)


def run(argv=None):
  """Main entry point"""
  pipeline_options = PipelineOptions()
  google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
  google_cloud_options.project = PROJECT_ID
  google_cloud_options.job_name = JOB_NAME
  google_cloud_options.staging_location = '%s/staging' % BUCKET_URL
  google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL

  #pipeline_options.view_as(StandardOptions).runner = 'DirectRunner'
  pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
  p = beam.Pipeline(options=pipeline_options)

  output = p | 'Get Rows from Spanner' >> beam.io.Read(SpannerSource())
  iso_datetime = dt.datetime.now().replace(microsecond=0).isoformat()
  output | 'Store in GCS' >> WriteToText(file_path_prefix=OUTPUT + iso_datetime + '-' + TABLE, file_name_suffix='')  # if this line is commented, job completes but does not do anything


  result = p.run()
  result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

然而,这个脚本只能在 DirectRunner 上正确运行:当我让它在 DataflowRunner 上运行时,它在没有任何输出的情况下运行了一段时间,然后退出并出现错误:

However, this script runs correctly only on the DirectRunner: when I let it run on the DataflowRunner, it runs for a while without any output, before exiting with an error:

执行失败步骤失败14 [...] 工作流程失败.原因:[...] 工作人员与服务失去联系."

"Executing failure step failure14 [...] Workflow failed. Causes: [...] The worker lost contact with the service."

有时,它会一直持续下去,而不会创建输出.

Sometimes, it just goes on forever, without creating an output.

此外,如果我对output = ..."行进行注释,则作业完成,但并未实际读取数据.

Moreover, if I comment the line 'output = ...', the job completes, but without actually reading the data.

dataflowRunner 似乎还调用了源的函数estimate_size",而不是函数read"或get_range_tracker".

It also appears that the dataflowRunner calls the function 'estimate_size' of the source, but not the functions 'read' or 'get_range_tracker'.

有没有人对可能导致这种情况的原因有任何想法?我知道有一个(更完整的)java SDK 和一个实验性的扳手源/接收器可用,但如果可能的话,我宁愿坚持使用 python.

Does anyone have any ideas about what may cause this ? I know there is a (more complete) java SDK with an experimental spanner source/sink available, but if possible I'd rather stick with python.

谢谢

推荐答案

Google 目前添加了对 Backup Spanner with Dataflow 的支持,您可以在创建 DataFlow 作业时选择相关模板.

Google currently added support of Backup Spanner with Dataflow, you can choose related template when creating DataFlow job.

更多信息:https://cloud.google.com/blog/products/gcp/cloud-spanner-adds-import-export-functionality-to-ease-data-movement

这篇关于从扳手读取的谷歌数据流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆