用于从 SFTP 服务器进行无服务器日志摄取(文件下载)的 Google Cloud Platform 解决方案 [英] Google Cloud Platform solution for serverless log ingestion (files downloading) from a SFTP server

本文介绍了用于从 SFTP 服务器进行无服务器日志摄取(文件下载)的 Google Cloud Platform 解决方案的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

今天有一个问题,在我输入答案时被删除了(不知道为什么).由于答案很长,我决定复制/重新创建它并提供我的答案.也许它对某人有用.

There was a question today, which was deleted (don’t know why) while I typed the answer. As the answer is long, I decided to copy/recreate it and provide my answer anyway. Maybe it will be useful for somebody.

这是原始问题:

我们有一个 SFTP 服务器,用于转储 Apache、Nginx、Wordpress 日志.我们希望将这些日志备份到 Cloud Storage,同时解析这些日志的内容并插入到 BigQuery 表中.我通常使用 Cloud Functions(NodeJS 或 Python),这是我首先想到的首选解决方案.

We have an SFTP server where Apache, Nginx, Wordpress logs are dumped. We want to back-up those logs into Cloud Storage and at the same time parse the contents of those logs and insert into BigQuery tables. I usually work with Cloud Functions (either NodeJS or Python) and that first comes to my mind as a go-to solution.

但是,Cloud Function 有一个触发器,如果​​我的目标是让程序在 SFTP 文件夹上持续观察/观察/监听新文件,那么这没有意义.如果我的要求不那么严格,我可以按计划触发它,比如每小时读取新文件的 SFTP 文件夹.当新文件被转储到 Cloud Storage 时,Cloud Functions 也会在这部分工作,触发函数解析日志文件并插入到 BigQuery.

However, Cloud Function has a trigger, which does not make sense if my objective is for a program to continuously watch/observe/listen on the SFTP folder for new files. If my requirement is less strict, I can trigger it on schedule, say hourly to read the SFTP folder for new files. Also Cloud Functions works on the part when new files are dumped into Cloud Storage, triggering the function to do parsing of the log files and inserting to BigQuery.

如果我坚持持续监听 SFTP 文件夹的要求,您能否提出更好的设计解决方案以及我需要将哪些 GCP 服务(除了 Cloud Storage 和 BigQuery)组合在一起才能实现这一目标?

If I stick with the requirement of continuously listening to SFTP folder, can you suggest a better design solution and what combination of GCP services (aside from Cloud Storage and BigQuery) I need to put together in order to accomplish this?

如果要求不那么严格,我的解决方案是否足够好?附言我刚刚发现 SFTP 凭据具有只读权限.所以通过添加后缀来重命名文件是没有问题的.我应该使用像 MemoryStore 这样的缓存来记住哪些文件完成了吗?

How about with the less strict requirement, is my solution good enough? P.S. I just found out that the SFTP credentials have read-only privileges. So renaming the files that are done by adding suffix is out of question. Should I use a cache like MemoryStore to remember which files are done?

推荐答案

Longread.

在我看来,这是一个非常大的问题.解决方案不仅需要代码开发,还需要大量的设计思维和决策(包括一些妥协).

From my point of view this is a very big question. And solution requires not only code development, but also a significant amount of design thinking and decisions (including some compromises).

根据我的个人经验(我开发了两次这样的解决方案,在生产中对其进行维护等),可以将云功能与一组 GCP 资源一起使用 - 秘密管理器、pubsub 主题、firestore 集合、服务帐户和他们的 IAM 等等......根据您的要求(我不知道详细信息)和上下文 - 您可能需要创建一个功能组件,它由几个(假设是两到五个)不同的云组成职能.二——如果你的文件很小(每个最大100M),每天的文件数量不大(几千或几万个文件),你有权利在下载时从SFTP服务器中删除原始文件.

Based on my personal experience (I developed such solution twice, maintained it in production, etc) it is possible to use cloud functions together with a set of of GCP resources - secret manager, pubsub topics, firestore collections, service accounts and IAM for them and so on... Depending on your requirements (which I don't know in details) and the context - you might need to create a functional component which consists of a few (let's say between two and five) different cloud functions. Two - if your files a small (up to 100M each), the daily number of files is not big (a few thousands or tens of thousands of files), and you have rights to delete the original file from the SFTP server upon downloading.

如果您没有这样的权限 - 应该有一些其他进程可以清除旧"或已下载"文件.否则,最终该解决方案将不起作用(当仅下载文件列表,而不是文件,而只是文件列表时,需要超过 540 秒).

If you don't have such rights - there should be some other process which cleans the 'old' or 'already downloaded' files. Otherwise, eventually the solution won't' work (when the downloading of just a list of files, not the files, but just a list of files, takes more than 540 seconds).

SFTP 是一个被动"组件——它不会通知我们是否有新文件到达,所以我们这边应该有一些主动"组件来发起到 SFTP 服务器的连接.这是一种拉式"交互,并且有规律性 - 即每 10、15 或 20 分钟 - 连接到 SFTP 服务器并检查是否有新内容可供下载.

SFTP is a 'passive' component - it won't inform us if new files arrive, so there should be some 'active' component on our side to originate a connection to the SFTP server. It is a 'pull' interaction, and there is regularity - i.e. every 10, 15, or 20 minutes - connect to the SFTP server and check if there is anything new to download.

接下来.Cloud Functions 是幂等的,无法仅在云函数内存储/保持文件下载的状态.应该有一些外部(相对于云功能)服务来维护每个文件的下载过程的状态机.我使用了 Firestore.它非常方便,延迟非常小.firestore 集合中的每个文档都代表了文件下载过程"的反映 - 一个状态机以及大量元数据、状态转换历史等.

Next. Cloud Functions are idempotent, and it is not possible to store/keep the state of a file downloading just within a cloud function. There should be some external (relative to the Cloud Functions) service to maintain the state machine fo the downloading process for every file. I used a Firestore. It very convenient and has very small latency. Each document in the firestore collection represent a reflection of a 'process of a file downloading' - a state machine together with plenty of metadata, history of state transitions and so on.

Cloud Functions 有两个重要的限制:

Cloud Functions have 2 important restrictions:

  1. 540 秒超时.
  2. 2Gb 内存.

这意味着下载过程(和任何其他活动)不应超过 540 秒.如果您要将任何数据存储在(云功能的)内存中,则数据块应小于 2Gb.

It means that the downloading process (and any other activities) should not exceed 540 seconds. And if you are to store any data in memory (of the cloud function) that data chunk should be smaller than 2Gb.

超时限制会影响进程吗?- 是的,它可以.整个过程的瓶颈是带宽".SFTP 服务器和 GCP 接入点之间.文件越大 - 下载所需的时间就越长,尤其是在并行下载许多文件时.

Can the timeout restriction affect the process? - Yes it can. The bottleneck of the whole process is the "bandwidth" between the SFTP server and the GCP point of presence. The bigger the file - the longer it will take to download it, especially when many files are being downloaded in parallel.

因此,很快该算法的工作方式如下:

So, very shortly the algorithm works in the following way:

1/第一个云函数每 15 分钟触发一次(云调度程序 => PubSub 主题 => 云函数).云函数读取所有 SFTP 连接和所有数据管道的配置(即来自 GCS 存储桶的 json 文件)(因为该组件可能与许多 SFTP 服务器一起工作,并且对于每个 SFTP 服务器可能有许多数据管道),然后从Secret Manager(用于每个 SFTP 服务器),然后连接到 SFTP 服务器,并下载每个连接/管道的可用文件列表.因此,对于我们知道的每个文件 - 连接(SFTP 服务器)、管道(即源目录)、文件名、文件大小、文件修改时间戳.我不希望 SFTP 服务器有更多的东西.对于每个连接和数据管道,我们组成一个列表(取决于配置并且应该是灵活的)最多 5 个、8 个或 10 万个文件的文件.该列表作为 json 结构被作为消息推送到 PubSub 主题中(如果需要,还有一些额外的元数据).因此,如果我们有 2 个 SFTP 服务器,并且每个服务器中有 3 个管道 - 至少会有 6 条消息.可能更多,如果 SFTP 服务器中的一个目录包含超过 5K、或 8K、或 10K 的文件.目前,我们不知道这些文件是否已下载,或者下载过程正在进行中,或者失败,或者这是一个新文件.部署此功能时 - 最大实例"参数的值为 1.

1/ The first cloud function is triggered every, let's say, 15 minutes (Cloud Scheduler => PubSub topic => cloud function). The cloud function reads configurations (i.e. json file from the GCS bucket) for all SFTP connections and all data pipelines (as this component might work with many SFTP servers, and for each SFTP server there may be many data pipelines), then gets credentials from the Secret Manager (for each SFTP server), then connects to the SFTP server, and downloads lists of available files for each connection/pipeline. Thus for every file we know - connection (SFTP server), pipeline (i.e source directory), file name, file size, file modification timestamp. I would not expect anything more from the SFTP server. For every connection and data pipeline we compose a list of (depends on configurations and that should be flexible) files up to 5, or 8 or 10 thousands of files. That list as a json structure is pushed into a PubSub topic as a message (with some additional metadata if required). So, if we have 2 SFTP servers, and 3 pipelines in each of them - there will be at least 6 messages. May be more, if a directory in the SFTP server contains more than 5K, or 8K, or 10K files. At this moment, we don't know if those files were downloaded, or the downloading process is in progress, or it has failed, or this is a new file. When this function is deployed - the "max instances" argument has value 1.

2/第二个云函数由包含文件列表的 PubSub 消息触发(对于某些 SFTP 服务器和某些管道).对于传入列表中的每个文件,云函数应该决定要做什么:

2/ The second cloud function is triggered by a PubSub message which contain a list of files (for some SFTP server and some pipeline). For every file from the incoming list, the cloud function should make a decision what is to be done:

  1. 这是一个新文件,应该下载.
  2. 正在进行下载,我们需要等待更多时间 - 什么都不做.
  3. 这是一个已经下载的文件,我们什么都不做.
  4. 这是一个正在进行的下载,但时间太长 - 可能下载崩溃了,应该重新下载.
  5. 这是……可能还有更多情况需要处理……

现在需要 firestore 集合.集合中的每个文档 - 反映文件发生的情况;一切都记录在那里 - 下载过程何时开始,何时(或是否)完成等等.文档 ID 是基于可用元数据计算的哈希 - 连接(SFTP 服务器)、管道(即源目录)、源文件名、源文件大小、源文件修改时间戳.所有这些都来自消息.

Now the firestore collection is required. Each document in the collection - a reflection of what happens with the file; everything is recorded there - when the downloading process started, when (or if) it finished and so on. The document ID is a hash calculated based on available metadata - connection (SFTP server), pipeline (i.e source directory), source file name, source file size, source file modification timestamp. All of that comes from the message.

例如,我们计算散列并检查集合中是否存在这样的文档.如果它不存在 - 创建一个新文档,因为这是一个全新的下载文件.然后编写一个 json 消息并将其推送到第二个 PubSub 主题 - 下一个云函数将处理它.它存在 - 有必要决定我们要用它做什么 - 什么都不做(因为它已经下载,或者因为下载可能仍在进行中)或再次触发它的下载 - 编写一个 json 消息并推送它进入第二个 PubSub 主题...

For example, we calculate the hash and check if such document exists in the collection. If it does not exist - create a new document, as this is a completely new file for downloading. Then compose a json message and push it into the second PubSub topic - the next cloud function is going to work on it. It it exists - it is necessary to decide what we are going to do with it - do nothing (because it is already downloaded, or because the downloading is probably still in progress) or trigger its downloading again - compose a json message and push it into the second PubSub topic...

部署此功能时 - 最大实例数"参数的值在 4 到 12 之间(根据我的经验).

When this function is deployed - the "max instances" argument has value between 4 and 12 (in my experience).

3/第三个云函数由包含要下载文件的详细信息的 PubSub 消息触发.要完成以下步骤:

3/ The third cloud function is triggered by a PubSub message which contain details of a file for downloading. The following steps to be done:

  1. 检查此文件没有被其他云功能下载
  2. 更新 firestore 文档 - 我们开始下载过程
  3. 获取配置详细信息(从 GCS 中的 json 文件)
  4. 获取连接详细信息(来自 Secret Manager)
  5. 连接和下载
  6. 将下载的文件保存到目标 GCS 存储桶中
  7. 更新 firestore 文档 - 我们完成了下载过程

部署此功能时 - 最大实例数"参数的值在 10 到 30 之间(根据我的经验).

When this function is deployed - the "max instances" argument has value between 10 and 30 (in my experience).

这是一个非常简短的描述,在最简单的假设下(即您没有大于 100Mb 的文件或/并且连接良好).

That is a very short description, under the most simple assumptions (i.e. you don't have file larger than 100Mb or/and the connection is good).

需要注意的一些其他事项.

Some additional things to have in mind.

1/准确记录.具有一致字段的 Json 结构将被定期记录.我建议做一个接收器,以便可以在 BigQuery 表中分析日志.

1/ Accurate logging. Json structures with the consistent fields are to be logged regularly. I would suggest to make a sink, so the log can be analysed in BigQuery tables.

2/服务帐户和 IAM.所有这些都应在仅用于给定组件的自定义服务帐户下运行.需要提供相关的 IAM 角色.

2/ Service Accounts and IAM. All of that should run under a custom service account used only for the given component. Relevant IAM roles are to be provided.

3/云 NAT.SFTP(以我的经验)仅适用于特定的静态 IP 地址(它们不允许来自任何地址的连接).因此,网络、子网、IP 地址、路由器、NAT - 所有这些都将被创建和配置.将向 SFTP 服务器所有者提供 IP 地址以允许访问.将使用vpc 连接器"部署的云功能参数.

3/ Cloud NAT. SFTP (in my experience) works only with specific static IP addresses (they won't allow connection from any address). Thus, a network, subnet, IP address, router, NAT - all of that is to be created and configured. The IP address is to be provided the the SFTP server owner, to allow access. Cloud functions to be deployed with "vpc connector" arguments.

4/进度和监控 - 3 个信息来源 - Firestore 收集、Stackdriver 日志、BigQuery 表.

4/ The progress and monitoring - 3 sources of information - firestore collection, Stackdriver logs, BigQuery tables.

同样,这是我脑海中的一个非常简单的描述.如果您有具体问题或想要讨论,请告诉我.

Again, this is a very simplified description from the top of my mind. Let me know if you have specific questions or would like to discuss.

这篇关于用于从 SFTP 服务器进行无服务器日志摄取(文件下载)的 Google Cloud Platform 解决方案的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆