如何在Google Cloud Dataflow中建立到外部数据库服务器的SSH隧道? [英] How to set up a SSH tunnel in Google Cloud Dataflow to an external database server?

查看:75
本文介绍了如何在Google Cloud Dataflow中建立到外部数据库服务器的SSH隧道?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在使用DataflowRunner使Apache Beam管道在Cloud Dataflow上运行时,我面临着一个问题.

I am facing a problem to make my Apache Beam pipeline work on Cloud Dataflow, with DataflowRunner.

管道的第一步是连接到VM上托管的外部Postgresql服务器,该服务器只能通过SSH端口22从外部进行访问,并提取一些数据.我无法更改这些防火墙规则,因此只能通过SSH隧道(也称为端口转发)通过 连接到数据库服务器.

The first step of the pipeline is to connect to an external Postgresql server hosted on a VM which is only externally accessible through SSH, port 22, and extract some data. I can't change these firewalling rules, so I can only connect to the DB server via SSH tunneling, aka port-forwarding.

在我的代码中,我使用了python库sshtunnel.当使用 DirectRunner 从我的开发计算机启动管道时,它可以完美工作:

In my code I make use of the python library sshtunnel. It works perfectly when the pipeline is launched from my development computer with DirectRunner:

from sshtunnel import open_tunnel

with open_tunnel(
        (user_options.ssh_tunnel_host, user_options.ssh_tunnel_port),
        ssh_username=user_options.ssh_tunnel_user,
        ssh_password=user_options.ssh_tunnel_password,
        remote_bind_address=(user_options.dbhost, user_options.dbport)
    ) as tunnel:
        with beam.Pipeline(options=pipeline_options) as p:
            (p | "Read data" >> ReadFromSQL(
                host=tunnel.local_bind_host,
                port=tunnel.local_bind_port,
                username=user_options.dbusername,
                password=user_options.dbpassword,
                database=user_options.dbname,
                wrapper=PostgresWrapper,
                query=select_query
            )
                | "Format CSV" >> DictToCSV(headers)
                | "Write CSV" >> WriteToText(user_options.export_location)
            )

在非默认VPC中使用 DataflowRunner 启动的相同代码(其中所有入口均被拒绝但没有出口限制,并且配置了CloudNAT)失败,并显示以下消息:

The same code, launched with DataflowRunner inside a non-default VPC where all ingress are deny but no egress restriction, and CloudNAT configured, fails with this message:

psycopg2.OperationalError:无法连接到服务器:连接被拒绝服务器在主机"0.0.0.0"上运行吗?并在端口41697上接受TCP/IP连接?[在运行读取数据/读取"时]

psycopg2.OperationalError: could not connect to server: Connection refused Is the server running on host "0.0.0.0" and accepting TCP/IP connections on port 41697? [while running 'Read data/Read']

因此,很明显,我的隧道出了点问题,但是我无法确切地发现到底是什么.我开始怀疑是否甚至可以通过CloudNAT进行直接SSH隧道设置,直到找到此博客文章:

So, obviously something is wrong with my tunnel but I cannot spot what exactly. I was beginning to wonder whether a direct SSH tunnel setup was even possible through CloudNAT, until I found this blog post: https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1 stating:

Cloud Dataflow的核心优势在于您可以调用外部服务来进行数据丰富.例如,您可以调用微服务以获取元素的其他数据.在DoFn中,调出服务(通常通过HTTP完成).只要您在项目/网络中设置的防火墙规则允许,您就可以完全控制自己选择的任何类型的连接.

因此应该可以建立此隧道!我不想放弃,但我不知道下一步该怎么做.有什么主意吗?

So it should be possible to set up this tunnel ! I don't want to give up but I don't know what to try next. Any idea ?

感谢阅读

推荐答案

问题已解决!我不敢相信我已经花了整整两天的时间...我的方向完全错误.

Problem solved ! I can't believe I've spent two full days on this... I was looking completely in the wrong direction.

问题不在于某些Dataflow或GCP网络配置,据我所知...

The issue was not with some Dataflow or GCP networking configuration, and as far as I can tell...

只要您在项目/网络中设置的防火墙规则允许,您就可以完全控制自己选择的任何类型的连接

You have full control to make any type of connection that you choose, so long as the firewall rules you set up within your project/network allow it

是真的.

问题当然出在我的代码中:只有在分布式环境中才发现问题.我犯了一个错误,那就是从主管道处理器而不是工人那里打开隧道.因此,SSH隧道已建立,但没有在工作服务器与目标服务器之间建立,而是在主管道与目标之间建立了!

The problem was of course in my code : only the problem was revealed only in a distributed environment. I had make the mistake of opening the tunnel from the main pipeline processor, instead of the workers. So the SSH tunnel was up but not between the workers and the target server, only between the main pipeline and the target!

要解决此问题,我必须更改请求的DoFn,以使用隧道包装查询执行:

To fix this, I had to change my requesting DoFn to wrap the query execution with the tunnel :

class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""

def __init__(self, *args, **kwargs):
    self.dbport = kwargs["port"]
    self.dbhost = kwargs["host"]
    self.args = args
    self.kwargs = kwargs
    super().__init__(*args, **kwargs)

def process(self, query, *args, **kwargs):
    # Remote side of the SSH Tunnel
    remote_address = (self.dbhost, self.dbport)
    ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
    with open_tunnel(
        ssh_tunnel,
        ssh_username=self.kwargs["ssh_user"],
        ssh_password=self.kwargs["ssh_password"],
        remote_bind_address=remote_address,
        set_keepalive=10.0
    ) as tunnel:
        forwarded_port = tunnel.local_bind_port
        self.kwargs["port"] = forwarded_port
        source = sql.SQLSource(*self.args, **self.kwargs)
        sql.SQLSouceInput._build_value(source, source.runtime_params)
        logging.info("Processing - {}".format(query))
        for records, schema in source.client.read(query):
            for row in records:
                yield source.client.row_as_dict(row, schema)

如您所见,我不得不重写pysql_beam库的某些位.

as you can see, I had to override some bits of pysql_beam library.

最后,每个工作人员为每个请求打开自己的隧道.可以优化此行为,但这足以满足我的需求.

Finally, each worker open its own tunnel for each request. It's probably possible to optimize this behavior but it's enough for my needs.

这篇关于如何在Google Cloud Dataflow中建立到外部数据库服务器的SSH隧道?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆