如何在 Google Cloud Dataflow 中设置到外部数据库服务器的 SSH 隧道? [英] How to set up a SSH tunnel in Google Cloud Dataflow to an external database server?

查看:27
本文介绍了如何在 Google Cloud Dataflow 中设置到外部数据库服务器的 SSH 隧道?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在使用 DataflowRunner 使我的 Apache Beam 管道在 Cloud Dataflow 上运行时遇到问题.

I am facing a problem to make my Apache Beam pipeline work on Cloud Dataflow, with DataflowRunner.

管道的第一步是连接到托管在虚拟机上的外部 Postgresql 服务器,该服务器只能通过 SSH、端口 22 进行外部访问,并提取一些数据.我无法更改这些防火墙规则,因此我只能通过 SSH 隧道(又名端口转发)连接到数据库服务器.

The first step of the pipeline is to connect to an external Postgresql server hosted on a VM which is only externally accessible through SSH, port 22, and extract some data. I can't change these firewalling rules, so I can only connect to the DB server via SSH tunneling, aka port-forwarding.

在我的代码中,我使用了 python 库 sshtunnel.当使用 DirectRunner 从我的开发计算机启动管道时,它可以完美运行:

In my code I make use of the python library sshtunnel. It works perfectly when the pipeline is launched from my development computer with DirectRunner:

from sshtunnel import open_tunnel

with open_tunnel(
        (user_options.ssh_tunnel_host, user_options.ssh_tunnel_port),
        ssh_username=user_options.ssh_tunnel_user,
        ssh_password=user_options.ssh_tunnel_password,
        remote_bind_address=(user_options.dbhost, user_options.dbport)
    ) as tunnel:
        with beam.Pipeline(options=pipeline_options) as p:
            (p | "Read data" >> ReadFromSQL(
                host=tunnel.local_bind_host,
                port=tunnel.local_bind_port,
                username=user_options.dbusername,
                password=user_options.dbpassword,
                database=user_options.dbname,
                wrapper=PostgresWrapper,
                query=select_query
            )
                | "Format CSV" >> DictToCSV(headers)
                | "Write CSV" >> WriteToText(user_options.export_location)
            )

在非默认 VPC 中使用 DataflowRunner 启动的相同代码,其中所有入口都被拒绝但没有出口限制,并且配置了 CloudNAT,失败并显示以下消息:

The same code, launched with DataflowRunner inside a non-default VPC where all ingress are deny but no egress restriction, and CloudNAT configured, fails with this message:

psycopg2.OperationalError:无法连接到服务器:连接被拒绝 服务器是否在主机0.0.0.0"上运行?并接受端口 41697 上的 TCP/IP 连接?[在运行读取数据/读取"时]

psycopg2.OperationalError: could not connect to server: Connection refused Is the server running on host "0.0.0.0" and accepting TCP/IP connections on port 41697? [while running 'Read data/Read']

所以,显然我的隧道出了点问题,但我无法确定到底是什么.我开始怀疑是否可以通过 CloudNAT 直接设置 SSH 隧道,直到我找到了这篇博文:https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1 说明:

So, obviously something is wrong with my tunnel but I cannot spot what exactly. I was beginning to wonder whether a direct SSH tunnel setup was even possible through CloudNAT, until I found this blog post: https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1 stating:

Cloud Dataflow 的一个核心优势是您可以调用外部服务来丰富数据.例如,您可以调用微服务来获取元素的附加数据.在 DoFn 中,调用服务(通常通过 HTTP 完成).只要您在项目/网络中设置的防火墙规则允许,您就可以完全控制进行您选择的任何类型的连接.

A core strength of Cloud Dataflow is that you can call external services for data enrichment. For example, you can call a micro service to get additional data for an element. Within a DoFn, call-out to the service (usually done via HTTP). You have full control to make any type of connection that you choose, so long as the firewall rules you set up within your project/network allow it.

所以应该可以建立这个隧道!我不想放弃,但我不知道下一步该尝试什么.有什么想法吗?

So it should be possible to set up this tunnel ! I don't want to give up but I don't know what to try next. Any idea ?

感谢阅读

推荐答案

问题解决了!我简直不敢相信我已经花了整整两天的时间……我完全看错了方向.

Problem solved ! I can't believe I've spent two full days on this... I was looking completely in the wrong direction.

问题不在于某些 Dataflow 或 GCP 网络配置,据我所知...

The issue was not with some Dataflow or GCP networking configuration, and as far as I can tell...

您可以完全控制选择任何类型的连接,只要您在项目/网络中设置的防火墙规则允许

You have full control to make any type of connection that you choose, so long as the firewall rules you set up within your project/network allow it

是真的.

问题当然出在我的代码中:只有在分布式环境中才会发现问题.我犯了从主管道处理器而不是工人打开隧道的错误.所以 SSH 隧道已经建立,但没有在 worker 和目标服务器之间,只有在主管道和目标之间!

The problem was of course in my code : only the problem was revealed only in a distributed environment. I had make the mistake of opening the tunnel from the main pipeline processor, instead of the workers. So the SSH tunnel was up but not between the workers and the target server, only between the main pipeline and the target!

为了解决这个问题,我不得不改变我的请求 DoFn 来用隧道包装查询执行:

To fix this, I had to change my requesting DoFn to wrap the query execution with the tunnel :

class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""

def __init__(self, *args, **kwargs):
    self.dbport = kwargs["port"]
    self.dbhost = kwargs["host"]
    self.args = args
    self.kwargs = kwargs
    super().__init__(*args, **kwargs)

def process(self, query, *args, **kwargs):
    # Remote side of the SSH Tunnel
    remote_address = (self.dbhost, self.dbport)
    ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
    with open_tunnel(
        ssh_tunnel,
        ssh_username=self.kwargs["ssh_user"],
        ssh_password=self.kwargs["ssh_password"],
        remote_bind_address=remote_address,
        set_keepalive=10.0
    ) as tunnel:
        forwarded_port = tunnel.local_bind_port
        self.kwargs["port"] = forwarded_port
        source = sql.SQLSource(*self.args, **self.kwargs)
        sql.SQLSouceInput._build_value(source, source.runtime_params)
        logging.info("Processing - {}".format(query))
        for records, schema in source.client.read(query):
            for row in records:
                yield source.client.row_as_dict(row, schema)

如你所见,我不得不重写一些 pysql_beam 库.

as you can see, I had to override some bits of pysql_beam library.

最后,每个工作人员为每个请求打开自己的隧道.可能可以优化此行为,但这足以满足我的需求.

Finally, each worker open its own tunnel for each request. It's probably possible to optimize this behavior but it's enough for my needs.

这篇关于如何在 Google Cloud Dataflow 中设置到外部数据库服务器的 SSH 隧道?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆