在 Python Dataflow/Apache Beam 上启动 CloudSQL 代理 [英] Start CloudSQL Proxy on Python Dataflow / Apache Beam
问题描述
我目前正在处理 ETL 数据流作业(使用 Apache Beam Python SDK),它从 CloudSQL(使用 psycopg2
和自定义 ParDo
)查询数据并编写它到 BigQuery.我的目标是创建一个 Dataflow 模板,我可以使用 Cron 作业从 AppEngine 启动该模板.
I am currently working on a ETL Dataflow job (using the Apache Beam Python SDK) which queries data from CloudSQL (with psycopg2
and a custom ParDo
) and writes it to BigQuery. My goal is to create a Dataflow template which I can start from a AppEngine using a Cron job.
我有一个使用 DirectRunner 在本地工作的版本.为此,我使用 CloudSQL (Postgres) 代理客户端,以便我可以连接到 127.0.0.1 上的数据库.
I have a version which works locally using the DirectRunner. For that I use the CloudSQL (Postgres) proxy client so that I can connect to the database on 127.0.0.1 .
当使用带有自定义命令的 DataflowRunner 在 setup.py 脚本中启动代理时,作业将不会执行.它坚持重复此日志消息:
When using the DataflowRunner with custom commands to start the proxy within a setup.py script, the job won't execute. It stucks with repeating this log-message:
设置节点注解以启用卷控制器附加/分离
我的 setup.py 的一部分如下所示:
A part of my setup.py looks the following:
CUSTOM_COMMANDS = [
['echo', 'Custom command worked!'],
['wget', 'https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64', '-O', 'cloud_sql_proxy'],
['echo', 'Proxy downloaded'],
['chmod', '+x', 'cloud_sql_proxy']]
class CustomCommands(setuptools.Command):
"""A setuptools Command class able to run arbitrary commands."""
def initialize_options(self):
pass
def finalize_options(self):
pass
def RunCustomCommand(self, command_list):
print('Running command: %s' % command_list)
logging.info("Running custom commands")
p = subprocess.Popen(
command_list,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Can use communicate(input='y\n'.encode()) if the command run requires
# some confirmation.
stdout_data, _ = p.communicate()
print('Command output: %s' % stdout_data)
if p.returncode != 0:
raise RuntimeError(
'Command %s failed: exit code: %s' % (command_list, p.returncode))
def run(self):
for command in CUSTOM_COMMANDS:
self.RunCustomCommand(command)
subprocess.Popen(['./cloud_sql_proxy', '-instances=bi-test-1:europe-west1:test-animal=tcp:5432'])
在阅读 this issue on Github 来自 sthomp 和 this 关于 Stackoverflo 的讨论.我还尝试使用 subprocess.Popen
的一些参数.
I added the last line as separate subprocess.Popen()
within run()
after reading this issue on Github from sthomp and this discussion on Stackoverflo. I also tried to play around with some parameters of subprocess.Popen
.
brodin 提到的另一个解决方案是允许从每个 IP 地址访问并通过用户名和密码进行连接.在我的理解中,他并没有声称这是最佳实践.
Another mentioned solution from brodin was to allow access from every IP address and to connect via username and password. In my understanding he does not claim this as best practice.
预先感谢您的帮助.
<强>!!!这篇文章底部的解决方法!!!
这些是在作业期间发生的错误级别的日志:
These are the logs on error level which occur during a job:
E EXT4-fs (dm-0): couldn't mount as ext3 due to feature incompatibilities
E Image garbage collection failed once. Stats initialization may not have completed yet: unable to find data for container /
E Failed to check if disk space is available for the runtime: failed to get fs info for "runtime": unable to find data for container /
E Failed to check if disk space is available on the root partition: failed to get fs info for "root": unable to find data for container /
E [ContainerManager]: Fail to get rootfs information unable to find data for container /
E Could not find capacity information for resource storage.kubernetes.io/scratch
E debconf: delaying package configuration, since apt-utils is not installed
E % Total % Received % Xferd Average Speed Time Time Time Current
E Dload Upload Total Spent Left Speed
E
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0
100 3698 100 3698 0 0 25674 0 --:--:-- --:--:-- --:--:-- 25860
#-- HERE IS WHEN setup.py FOR MY JOB IS EXECUTED ---
E debconf: delaying package configuration, since apt-utils is not installed
E insserv: warning: current start runlevel(s) (empty) of script `stackdriver-extractor' overrides LSB defaults (2 3 4 5).
E insserv: warning: current stop runlevel(s) (0 1 2 3 4 5 6) of script `stackdriver-extractor' overrides LSB defaults (0 1 6).
E option = Interval; value = 60.000000;
E option = FQDNLookup; value = false;
E Created new plugin context.
E option = PIDFile; value = /var/run/stackdriver-agent.pid;
E option = Interval; value = 60.000000;
E option = FQDNLookup; value = false;
E Created new plugin context.
<小时>
这里可以找到我的自定义 setup.py 启动后的所有日志(日志级别:任意;所有日志):
Here you can find are all logs after the start of my custom setup.py (log-level: any; all logs):
作业日志(我在一段时间没有卡住后手动取消了作业):
Job logs (I manually canceled the job after not stucking for a while):
2018-06-08 (08:02:20) Autoscaling is enabled for job 2018-06-07_23_02_20-5917188751755240698. The number of workers will b...
2018-06-08 (08:02:20) Autoscaling was automatically enabled for job 2018-06-07_23_02_20-5917188751755240698.
2018-06-08 (08:02:24) Checking required Cloud APIs are enabled.
2018-06-08 (08:02:24) Checking permissions granted to controller Service Account.
2018-06-08 (08:02:25) Worker configuration: n1-standard-1 in europe-west1-b.
2018-06-08 (08:02:25) Expanding CoGroupByKey operations into optimizable parts.
2018-06-08 (08:02:25) Combiner lifting skipped for step Save new watermarks/Write/WriteImpl/GroupByKey: GroupByKey not fol...
2018-06-08 (08:02:25) Combiner lifting skipped for step Group watermarks: GroupByKey not followed by a combiner.
2018-06-08 (08:02:25) Expanding GroupByKey operations into optimizable parts.
2018-06-08 (08:02:26) Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
2018-06-08 (08:02:26) Annotating graph with Autotuner information.
2018-06-08 (08:02:26) Fusing adjacent ParDo, Read, Write, and Flatten operations
2018-06-08 (08:02:26) Fusing consumer Get rows from CloudSQL tables into Begin pipeline with watermarks/Read
2018-06-08 (08:02:26) Fusing consumer Group watermarks/Write into Group watermarks/Reify
2018-06-08 (08:02:26) Fusing consumer Group watermarks/GroupByWindow into Group watermarks/Read
2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/WriteBundles/WriteBundles into Save new watermar...
2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/GroupByKey/GroupByWindow into Save new watermark...
2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/GroupByKey/Reify into Save new watermarks/Write/...
2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/GroupByKey/Write into Save new watermarks/Write/...
2018-06-08 (08:02:26) Fusing consumer Write to BQ into Get rows from CloudSQL tables
2018-06-08 (08:02:26) Fusing consumer Group watermarks/Reify into Write to BQ
2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/Map(<lambda at iobase.py:926>) into Convert dict...
2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/WindowInto(WindowIntoFn) into Save new watermark...
2018-06-08 (08:02:26) Fusing consumer Convert dictionary list to single dictionary and json into Remove "watermark" label
2018-06-08 (08:02:26) Fusing consumer Remove "watermark" label into Group watermarks/GroupByWindow
2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/InitializeWrite into Save new watermarks/Write/W...
2018-06-08 (08:02:26) Workflow config is missing a default resource spec.
2018-06-08 (08:02:26) Adding StepResource setup and teardown to workflow graph.
2018-06-08 (08:02:26) Adding workflow start and stop steps.
2018-06-08 (08:02:26) Assigning stage ids.
2018-06-08 (08:02:26) Executing wait step start25
2018-06-08 (08:02:26) Executing operation Save new watermarks/Write/WriteImpl/DoOnce/Read+Save new watermarks/Write/WriteI...
2018-06-08 (08:02:26) Executing operation Save new watermarks/Write/WriteImpl/GroupByKey/Create
2018-06-08 (08:02:26) Starting worker pool setup.
2018-06-08 (08:02:26) Executing operation Group watermarks/Create
2018-06-08 (08:02:26) Starting 1 workers in europe-west1-b...
2018-06-08 (08:02:27) Value "Group watermarks/Session" materialized.
2018-06-08 (08:02:27) Value "Save new watermarks/Write/WriteImpl/GroupByKey/Session" materialized.
2018-06-08 (08:02:27) Executing operation Begin pipeline with watermarks/Read+Get rows from CloudSQL tables+Write to BQ+Gr...
2018-06-08 (08:02:36) Autoscaling: Raised the number of workers to 0 based on the rate of progress in the currently runnin...
2018-06-08 (08:02:46) Autoscaling: Raised the number of workers to 1 based on the rate of progress in the currently runnin...
2018-06-08 (08:03:05) Workers have started successfully.
2018-06-08 (08:11:37) Cancel request is committed for workflow job: 2018-06-07_23_02_20-5917188751755240698.
2018-06-08 (08:11:38) Cleaning up.
2018-06-08 (08:11:38) Starting worker pool teardown.
2018-06-08 (08:11:38) Stopping worker pool...
2018-06-08 (08:12:30) Autoscaling: Reduced the number of workers to 0 based on the rate of progress in the currently runni...
堆栈跟踪:
No errors have been received in this time period.
<小时>
更新:可以在下面的回答中找到解决方法
推荐答案
Workaround 解决方案:
我终于找到了解决方法.我的想法是通过 CloudSQL 实例的公共 IP 进行连接.为此,您需要允许从每个 IP 连接到您的 CloudSQL 实例:
Workaround Solution:
I finally found a workaround. I took the idea to connect via the public IP of the CloudSQL instance. For that you needed to allow connections to your CloudSQL instance from every IP:
- 转到 GCP 中 CloudSQL 实例的概览页面
- 点击
授权
标签 - 点击
Add network
并添加0.0.0.0/0
(!!这将允许每个IP地址连接到您的实例!!)
- Go to the overview page of your CloudSQL instance in GCP
- Click on the
Authorization
tab - Click on
Add network
and add0.0.0.0/0
(!! this will allow every IP address to connect to your instance !!)
为了增加流程的安全性,我使用了 SSL 密钥并且只允许 SSL 连接到实例:
To add security to the process, I used SSL keys and only allowed SSL connections to the instance:
- 点击
SSL
标签 - 点击
Create a new certificate
为你的服务器创建一个SSL证书 - 点击
Create a client certificate
为你的客户端创建一个SSL证书 - 点击
Allow only SSL connections
拒绝所有非 SSL 连接尝试
- Click on
SSL
tab - Click on
Create a new certificate
to create a SSL certificate for your server - Click on
Create a client certificate
to create a SSL certificate for you client - Click on
Allow only SSL connections
to reject all none SSL connection attempts
之后我将证书存储在 Google Cloud Storage 存储桶中并加载在 Dataflow 作业中连接之前,即:
After that I stored the certificates in a Google Cloud Storage bucket and load them before connecting within the Dataflow job, i.e.:
import psycopg2
import psycopg2.extensions
import os
import stat
from google.cloud import storage
# Function to wait for open connection when processing parallel
def wait(conn):
while 1:
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_WRITE:
pass
select.select([], [conn.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
pass
select.select([conn.fileno()], [], [])
else:
raise psycopg2.OperationalError("poll() returned %s" % state)
# Function which returns a connection which can be used for queries
def connect_to_db(host, hostaddr, dbname, user, password, sslmode = 'verify-full'):
# Get keys from GCS
client = storage.Client()
bucket = client.get_bucket(<YOUR_BUCKET_NAME>)
bucket.get_blob('PATH_TO/server-ca.pem').download_to_filename('server-ca.pem')
bucket.get_blob('PATH_TO/client-key.pem').download_to_filename('client-key.pem')
os.chmod("client-key.pem", stat.S_IRWXU)
bucket.get_blob('PATH_TO/client-cert.pem').download_to_filename('client-cert.pem')
sslrootcert = 'server-ca.pem'
sslkey = 'client-key.pem'
sslcert = 'client-cert.pem'
con = psycopg2.connect(
host = host,
hostaddr = hostaddr,
dbname = dbname,
user = user,
password = password,
sslmode=sslmode,
sslrootcert = sslrootcert,
sslcert = sslcert,
sslkey = sslkey)
return con
然后我在自定义 ParDo
中使用这些函数来执行查询.
最小示例:
I then use these functions in a custom ParDo
to perform queries.
Minimal example:
import apache_beam as beam
class ReadSQLTableNames(beam.DoFn):
'''
parDo class to get all table names of a given cloudSQL database.
It will return each table name.
'''
def __init__(self, host, hostaddr, dbname, username, password):
super(ReadSQLTableNames, self).__init__()
self.host = host
self.hostaddr = hostaddr
self.dbname = dbname
self.username = username
self.password = password
def process(self, element):
# Connect do database
con = connect_to_db(host = self.host,
hostaddr = self.hostaddr,
dbname = self.dbname,
user = self.username,
password = self.password)
# Wait for free connection
wait_select(con)
# Create cursor to query data
cur = con.cursor(cursor_factory=RealDictCursor)
# Get all table names
cur.execute(
"""
SELECT
tablename as table
FROM pg_tables
WHERE schemaname = 'public'
"""
)
table_names = cur.fetchall()
cur.close()
con.close()
for table_name in table_names:
yield table_name["table"]
管道的一部分可能如下所示:
A part of the pipeline then could look like this:
# Current workaround to query all tables:
# Create a dummy initiator PCollection with one element
init = p |'Begin pipeline with initiator' >> beam.Create(['All tables initializer'])
tables = init |'Get table names' >> beam.ParDo(ReadSQLTableNames(
host = known_args.host,
hostaddr = known_args.hostaddr,
dbname = known_args.db_name,
username = known_args.user,
password = known_args.password))
我希望这个解决方案可以帮助其他有类似问题的人
I hope this solution helps others with similar problems
这篇关于在 Python Dataflow/Apache Beam 上启动 CloudSQL 代理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!