如何使用GCP Cloud SQL作为Python的数据流源和/或接收器? [英] How to use GCP Cloud SQL as Dataflow source and/or sink with Python?

查看:59
本文介绍了如何使用GCP Cloud SQL作为Python的数据流源和/或接收器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否有任何指南可用于将Google Cloud SQL用作数据流读取源和/或接收器?

Is there any guidance available to use Google Cloud SQL as a Dataflow read source and/or sink?

Apache Beam Python SDK 2.1.0文档没有一章提到Google Cloud SQL. 但是这里有关于BigQuery的文章.

At the Apache Beam Python SDK 2.1.0 documentation there isn't a chapter mentioning Google Cloud SQL. But there is written about BigQuery.

当我阅读教程从将关系数据库导入到BigQuery 中,我看到他们在过程中使用导出的数据将其作为源文件.这意味着两者之间必须要有一个出口步骤,这并不理想.

And as I read tutorial Performing ETL from a Relational Database into BigQuery, I saw that they used exported data to file as a source in the process. That means there has to be an export step in between and that't not ideal.

在特定情况下使用Cloud SQL时是否需要解决某些特定问题?对于两个来源都是下沉的?

Are there specific issues you need to take care of when using Cloud SQL in specific? For both source as sink?

推荐答案

Beam Python SDK没有内置的转换功能,无法从MySQL/Postgres数据库读取数据.尽管如此,编写自定义转换来完成此操作应该不会太麻烦.您可以执行以下操作:

The Beam Python SDK does not have a built-in transform to read data from a MySQL/Postgres database. Nonetheless, it should not be too troublesome to write a custom transform to do this. You can do something like this:

with beam.Pipeline() as p:
  query_result_pc = (p 
                     | beam.Create(['select a,b,c from table1'])
                     | beam.ParDo(QueryMySqlFn(host='...', user='...'))
                     | beam.Reshuffle())

要连接到MySQL,我们将使用特定于mysql的库mysql.connector,但您可以将适当的库用于Postgres/etc.

To connect to MySQL, we'll use the mysql-specific library mysql.connector, but you can use the appropriate library for Postgres/etc.

您的查询功能是:

import mysql.connector


class QueryMySqlFn(beam.DoFn):

  def __init__(self, **server_configuration):
    self.config = server_configuration

  def start_bundle(self):
      self.mydb = mysql.connector.connect(**self.config)
      self.cursor = mydb.cursor()

  def process(self, query):
    self.cursor.execute(query)
    for result in self.cursor:
      yield result


对于Postgres,您可以使用psycopg2或任何其他可以连接到它的库:


For Postgres, you would use psycopg2 or any other library that allows you to connect to it:

import psycopg2

class QueryPostgresFn(beam.DoFn):

  def __init__(self, **server_config):
    self.config = server_config

  def process(self, query):
    con = psycopg2.connect(**self.config)
    cur = con.cursor()

    cur.execute(query)
    return cur.fetchall()


常见问题解答

  • 为什么在那里有一个beam.Reshuffle转换? -因为QueryMySqlFn不能并行从数据库读取数据.改组将确保我们的数据在下游并行进行进一步处理.
  • Why do you have a beam.Reshuffle transform there? - Because the QueryMySqlFn does not parallelize reading data from the database. The reshuffle will ensure that our data is parallelized downstream for further processing.

这篇关于如何使用GCP Cloud SQL作为Python的数据流源和/或接收器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆