Google Cloud Platform解决方案,可从SFTP服务器提取无服务器日志(下载文件) [英] Google Cloud Platform solution for serverless log ingestion (files downloading) from a SFTP server

本文介绍了Google Cloud Platform解决方案,可从SFTP服务器提取无服务器日志(下载文件)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

今天有一个问题,在我输入答案时被删除(不知道为什么).由于答案很长,因此我决定复制/重新创建它并提供答案.也许对某人有用.


这是原始问题:

我们有一个SFTP服务器,在该服务器上转储了Apache,Nginx,Wordpress日志.我们希望将这些日志备份到Cloud Storage中,同时解析这些日志的内容并插入BigQuery表中.我通常使用Cloud Functions(NodeJS或Python),而这首先是我的首选解决方案.

但是,Cloud Function有一个触发器,如果​​我的目标是使程序连续监视/观察/监听SFTP文件夹中的新文件,则该触发器没有意义.如果我的要求不太严格,则可以按计划触发,例如每小时一次,以读取SFTP文件夹中的新文件.当新文件转储到Cloud Storage中时,Cloud Functions也会发挥作用,触发该函数进行日志文件的解析并插入BigQuery.

如果我坚持不停地收听SFTP文件夹的要求,您能提出一个更好的设计解决方案,以及我需要将GCP服务(除Cloud Storage和BigQuery之外)组合在一起以实现此目的吗?

在不太严格的要求下,我的解决方案是否足够好?P.S.我刚刚发现SFTP凭据具有只读权限.因此,通过添加后缀来重命名文件是毫无疑问的.我应该使用MemoryStore之类的缓存来记住完成哪些文件吗?

解决方案

Longread.

从我的角度来看,这是一个很大的问题.解决方案不仅需要代码开发,还需要大量的设计思想和决策(包括一些折衷方案).

根据我的个人经验(我开发了这种解决方案两次,在生产中进行了维护等),可以将云功能与一组GCP资源一起使用-秘密管理器,pubsub主题,firestore集合,服务帐户和针对它们的IAM,依此类推...根据您的要求(我不了解详细信息)和上下文,您可能需要创建一个功能组件,该组件由几个(假设介于2到5个之间)不同的云组成职能.二-如果文件很小(每个文件最多100M),则每天的文件数量不大(几千或几万个文件),并且您有权在下载后从SFTP服务器删除原始文件.

如果您没有这样的权利-应该有一些其他过程可以清除旧"或已下载"文件.否则,该解决方案最终将无法正常工作(仅下载文件列表而不是文件列表,而仅下载文件列表需要540秒以上的时间).

SFTP是被动"的,它不会通知我们是否有新文件到达,因此我们这边应该有一些主动"组件来发起与SFTP服务器的连接.这是一次拉动"交互,并且有规律性-即每10、15或20分钟-连接到SFTP服务器并检查是否有新内容要下载.

下一步.云功能是幂等的,不能仅在云功能内存储/保持文件下载状态.应该有一些外部(相对于Cloud Functions)服务来维护每个文件的下载过程的状态机.我使用了Firestore.它非常方便,并且延迟很小.firestore集合中的每个文档都反映了文件下载过程"-状态机以及大量元数据,状态转换历史等.

云功能有两个重要限制:

  1. 540秒超时.
  2. 2Gb的内存.

这意味着下载过程(和任何其他活动)不应超过540秒.而且,如果要将任何数据存储在(云功能的)内存中,则数据块应小于2Gb.

超时限制会影响该过程吗?- 是的,它可以.整个过程的瓶颈是带宽".在SFTP服务器和GCP存在点之间.文件越大,下载时间越长,尤其是在并行下载许多文件时.

因此,很快,该算法将以以下方式工作:

1/第一个云功能每15分钟触发一次(Cloud Scheduler => PubSub topic =>云功能).云功能读取所有SFTP连接和所有数据管道的配置(例如,来自GCS存储桶的json文件)(因为此组件可能与许多SFTP服务器一起使用,并且对于每个SFTP服务器可能有许多数据管道),然后从(对于每个SFTP服务器)Secret Manager,然后连接到SFTP服务器,并下载每个连接/管道的可用文件列表.因此,对于我们知道的每个文件-连接(SFTP服务器),管道(即源目录),文件名,文件大小,文件修改时间戳.我不会再从SFTP服务器获得任何期望.对于每个连接和数据管道,我们组成一个文件列表(取决于配置,应该灵活),文件列表最多为5个,8个或1万个文件.该列表作为json结构作为消息被推送到PubSub主题中(如果需要,还带有一些其他元数据).因此,如果我们有2个SFTP服务器,并且每个服务器中都有3个管道-至少将有6条消息.如果SFTP服务器中的目录包含的文件超过5K,8K或10K,则可能会更多.目前,我们不知道这些文件是否已下载,或者下载过程正在进行中,还是失败了,或者这是一个新文件.当部署此功能时,最大实例数"将被设置为最大实例数".参数的值为1.

2/第二个云功能由PubSub消息触发,该消息包含文件列表(用于某些SFTP服务器和某些管道).对于传入列表中的每个文件,云功能都应决定要执行的操作:

  1. 这是一个新文件,应该下载.
  2. 这是一个正在进行的下载,我们需要等待更多时间-不执行任何操作.
  3. 这是一个已下载的文件,我们什么也不做.
  4. 这是一个正在进行的下载,但时间太长-可能下载失败,应重新下载.
  5. 这是...可能还有更多案件要处理...

现在需要Firestore集合.集合中的每个文档-反映文件的处理情况;所有内容都记录在此-下载过程开始时,何时(或是否完成)等等.文档ID是根据可用元数据计算得出的哈希值-连接(SFTP服务器),管道(即源目录),源文件名,源文件大小,源文件修改时间戳.所有这些都来自消息.

例如,我们计算哈希值并检查集合中是否存在此类文档.如果不存在,请创建一个新文档,因为这是一个全新的下载文件.然后编写一个json消息,并将其推送到第二个PubSub主题中-下一个云功能将对其进行处理.它存在-有必要决定我们将如何处理它-不执行任何操作(因为它已经下载,或者因为下载可能仍在进行中)或再次触发其下载-编写json消息并推送进入第二个PubSub主题...

部署此功能后-根据我的经验,该参数的值在4到12之间.

3/第三种云功能由PubSub消息触发,该消息包含要下载的文件的详细信息.要完成以下步骤:

  1. 检查此文件是否正在被其他云功能下载
  2. 更新firestore文档-我们开始下载过程
  3. 获取配置详细信息(从GCS中的json文件中)
  4. (从Secret Manager中)获取连接详细信息
  5. 连接和下载
  6. 将下载的文件保存到目标GCS存储桶中
  7. 更新firestore文档-我们已完成下载过程

部署此功能后-根据我的经验,参数的值在10到30之间.

在最简单的假设下(即您没有大于100Mb的文件或/并且连接良好),这是一个非常简短的描述.

还有一些其他注意事项.

1/准确记录.具有一致字段的Json结构将被定期记录.我建议创建一个接收器,以便可以在BigQuery表中分析日志.

2/服务帐户和IAM.所有这些都应在仅用于给定组件的自定义服务帐户下运行.将提供相关的IAM角色.

3/云NAT.SFTP(以我的经验)仅适用于特定的静态IP地址(它们不允许来自任何地址的连接).因此,将创建和配置网络,子网,IP地址,路由器,NAT.该IP地址将由SFTP服务器所有者提供,以允许访问.可以通过"vpc连接器"部署云功能.争论.

4/进度和监视-3个信息源-防火墙存储,Stackdriver日志,BigQuery表.

同样,从我的脑海中来看,这是一个非常简单的描述.让我知道您是否有特定问题或想要讨论.

There was a question today, which was deleted (don’t know why) while I typed the answer. As the answer is long, I decided to copy/recreate it and provide my answer anyway. Maybe it will be useful for somebody.


Here is the original question:

We have an SFTP server where Apache, Nginx, Wordpress logs are dumped. We want to back-up those logs into Cloud Storage and at the same time parse the contents of those logs and insert into BigQuery tables. I usually work with Cloud Functions (either NodeJS or Python) and that first comes to my mind as a go-to solution.

However, Cloud Function has a trigger, which does not make sense if my objective is for a program to continuously watch/observe/listen on the SFTP folder for new files. If my requirement is less strict, I can trigger it on schedule, say hourly to read the SFTP folder for new files. Also Cloud Functions works on the part when new files are dumped into Cloud Storage, triggering the function to do parsing of the log files and inserting to BigQuery.

If I stick with the requirement of continuously listening to SFTP folder, can you suggest a better design solution and what combination of GCP services (aside from Cloud Storage and BigQuery) I need to put together in order to accomplish this?

How about with the less strict requirement, is my solution good enough? P.S. I just found out that the SFTP credentials have read-only privileges. So renaming the files that are done by adding suffix is out of question. Should I use a cache like MemoryStore to remember which files are done?

解决方案

Longread.

From my point of view this is a very big question. And solution requires not only code development, but also a significant amount of design thinking and decisions (including some compromises).

Based on my personal experience (I developed such solution twice, maintained it in production, etc) it is possible to use cloud functions together with a set of of GCP resources - secret manager, pubsub topics, firestore collections, service accounts and IAM for them and so on... Depending on your requirements (which I don't know in details) and the context - you might need to create a functional component which consists of a few (let's say between two and five) different cloud functions. Two - if your files a small (up to 100M each), the daily number of files is not big (a few thousands or tens of thousands of files), and you have rights to delete the original file from the SFTP server upon downloading.

If you don't have such rights - there should be some other process which cleans the 'old' or 'already downloaded' files. Otherwise, eventually the solution won't' work (when the downloading of just a list of files, not the files, but just a list of files, takes more than 540 seconds).

SFTP is a 'passive' it won't inform us if new files arrive, so there should be some 'active' component on our side to originate a connection to the SFTP server. It is a 'pull' interaction, and there is regularity - i.e. every 10, 15, or 20 minutes - connect to the SFTP server and check if there is anything new to download.

Next. Cloud Functions are idempotent, and it is not possible to store/keep the state of a file downloading just within a cloud function. There should be some external (relative to the Cloud Functions) service to maintain the state machine fo the downloading process for every file. I used a Firestore. It very convenient and has very small latency. Each document in the firestore collection represent a reflection of a 'process of a file downloading' - a state machine together with plenty of metadata, history of state transitions and so on.

Cloud Functions have 2 important restrictions:

  1. 540 seconds timeout.
  2. 2Gb of memory.

It means that the downloading process (and any other activities) should not exceed 540 seconds. And if you are to store any data in memory (of the cloud function) that data chunk should be smaller than 2Gb.

Can the timeout restriction affect the process? - Yes it can. The bottleneck of the whole process is the "bandwidth" between the SFTP server and the GCP point of presence. The bigger the file - the longer it will take to download it, especially when many files are being downloaded in parallel.

So, very shortly the algorithm works in the following way:

1/ The first cloud function is triggered every, let's say, 15 minutes (Cloud Scheduler => PubSub topic => cloud function). The cloud function reads configurations (i.e. json file from the GCS bucket) for all SFTP connections and all data pipelines (as this component might work with many SFTP servers, and for each SFTP server there may be many data pipelines), then gets credentials from the Secret Manager (for each SFTP server), then connects to the SFTP server, and downloads lists of available files for each connection/pipeline. Thus for every file we know - connection (SFTP server), pipeline (i.e source directory), file name, file size, file modification timestamp. I would not expect anything more from the SFTP server. For every connection and data pipeline we compose a list of (depends on configurations and that should be flexible) files up to 5, or 8 or 10 thousands of files. That list as a json structure is pushed into a PubSub topic as a message (with some additional metadata if required). So, if we have 2 SFTP servers, and 3 pipelines in each of them - there will be at least 6 messages. May be more, if a directory in the SFTP server contains more than 5K, or 8K, or 10K files. At this moment, we don't know if those files were downloaded, or the downloading process is in progress, or it has failed, or this is a new file. When this function is deployed - the "max instances" argument has value 1.

2/ The second cloud function is triggered by a PubSub message which contain a list of files (for some SFTP server and some pipeline). For every file from the incoming list, the cloud function should make a decision what is to be done:

  1. This is a new file, and it should be downloaded.
  2. This is a downloading in progress and we need to wait more - do nothing.
  3. This is an already downloaded file and we do nothing.
  4. This is a downloading in progress, but it takes too long - probably the downloading crashed, and it should be downloaded again.
  5. This is ... there may be more cases to handle...

Now the firestore collection is required. Each document in the collection - a reflection of what happens with the file; everything is recorded there - when the downloading process started, when (or if) it finished and so on. The document ID is a hash calculated based on available metadata - connection (SFTP server), pipeline (i.e source directory), source file name, source file size, source file modification timestamp. All of that comes from the message.

For example, we calculate the hash and check if such document exists in the collection. If it does not exist - create a new document, as this is a completely new file for downloading. Then compose a json message and push it into the second PubSub topic - the next cloud function is going to work on it. It it exists - it is necessary to decide what we are going to do with it - do nothing (because it is already downloaded, or because the downloading is probably still in progress) or trigger its downloading again - compose a json message and push it into the second PubSub topic...

When this function is deployed - the "max instances" argument has value between 4 and 12 (in my experience).

3/ The third cloud function is triggered by a PubSub message which contain details of a file for downloading. The following steps to be done:

  1. Check that this file is not being downloaded by another cloud function
  2. Update the firestore document - we start the downloading process
  3. Get configuration details (from json file in GCS)
  4. Get connection details (from the Secret Manager)
  5. Connection and downloading
  6. Save the downloaded file into the target GCS bucket
  7. Update the firestore document - we finished the downloading process

When this function is deployed - the "max instances" argument has value between 10 and 30 (in my experience).

That is a very short description, under the most simple assumptions (i.e. you don't have file larger than 100Mb or/and the connection is good).

Some additional things to have in mind.

1/ Accurate logging. Json structures with the consistent fields are to be logged regularly. I would suggest to make a sink, so the log can be analysed in BigQuery tables.

2/ Service Accounts and IAM. All of that should run under a custom service account used only for the given component. Relevant IAM roles are to be provided.

3/ Cloud NAT. SFTP (in my experience) works only with specific static IP addresses (they won't allow connection from any address). Thus, a network, subnet, IP address, router, NAT - all of that is to be created and configured. The IP address is to be provided the the SFTP server owner, to allow access. Cloud functions to be deployed with "vpc connector" arguments.

4/ The progress and monitoring - 3 sources of information - firestore collection, Stackdriver logs, BigQuery tables.

Again, this is a very simplified description from the top of my mind. Let me know if you have specific questions or would like to discuss.

这篇关于Google Cloud Platform解决方案,可从SFTP服务器提取无服务器日志(下载文件)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆