如何应对(Apache Beam)高IO瓶颈? [英] How to deal with (Apache Beam) high IO bottlenecks?

查看:23
本文介绍了如何应对(Apache Beam)高IO瓶颈?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

让我们举一个简单的例子,我有一个非常简单的光束管道,它只是从文件中读取数据并将数据转储到输出文件中.现在让我们考虑输入文件是巨大的(一些 GB 大小,您通常无法在文本编辑器中打开的文件类型).由于直接运行器的实现非常简单(它将整个输入集读取到内存中),它将无法读取和输出这些大文件(除非您为 java vm 进程分配了不切实际的大量内存);所以我的问题是:像 flink/spark/cloud 数据流这样的生产运行器如何"?处理这个巨大的数据集问题"?- 假设他们不只是尝试将整个文件/数据集放入内存中?"-.

Let's say to cite a simple example, that I have a very simple beam pipeline which just reads from a file and dumps the data into an output file. Now let's consider that the input file is huge (some GBs in size, the type of file you can't typically open in a text editor). Since the direct-runner implementation is quite simple (it reads the whole input set into memory), it won't be able to read and output those huge files (unless you assign an impractically high amount of memory to the java vm process); so my question is: "How do production runners like flink/spark/cloud dataflow" deal with this 'huge dataset problem'? - assuming they would not just try to put the whole file(s)/dataset into memory?" -.

我希望生产运行器的实施需要分批或分批"工作;(如部分读取/处理/输出)以避免尝试在任何特定时间点将大量数据集放入内存中.有人可以分享他们关于生产运行人员如何处理这些巨大数据"的反馈吗?情况?

I'd expect production runner's implementation need to work "in parts or batches" (like reading/processing/outputting in parts) to avoid trying to fit huge datasets into memory at any specific point in time. Can somebody please share their feedback regarding how production runners deal with this "huge data" situation?

概括,请注意这也适用于其他输入/输出机制,例如,如果我的输入是来自巨大数据库表的 PCollection(广义上讲,行大小和数量都很大),生产的内部实现是否runner 以某种方式将给定的输入 SQL 语句分成许多内部生成的子语句,每个子语句采用较小的子集(例如,通过内部生成一个 count(-) 语句,然后是 N 个语句,每个语句采用 (count(-)/N) 个元素?直接-runner 不会这样做,只会将给定的查询 1:1 传递给数据库),或者是我作为开发人员批量迭代"的责任;并划分问题,如果情况确实如此,那么这里的最佳实践是什么,即:为这个或多个管道设置一个管道?或者迭代一个简单的管道并在管道外部管理必要的元数据?

Generalizing, please notice this applies for other input/output mechanisms too, for example if my input is a PCollection coming from huge database table (broadly speaking huge in both row-size and amount), does the internal implementation of the production's runner somehow divides the given input SQL statement into many internally generated sub statements each taking smaller subsets (for example by internally generating a count(-) statement, followed by N statements, each taking (count(-)/N) elements? the direct-runner won't do this and will just pass the given query 1:1 to the DB), or is my responsibility as a developer to "iterate in batches" and divide the problem, and if this is indeed the case, what are the best practices here, ie: having one pipeline for this or many?, and if only one then somehow parametrise the pipeline to read/write in batches? or iterate over a simple pipeline and manage necessary metadata externally to the pipeline?

提前致谢,任何反馈将不胜感激!

thanks in advance, any feedback would be greatly appreciated!

编辑(反映大卫的反馈):

大卫你的反馈非常有价值,绝对触及我感兴趣的点.有一个工作发现阶段来拆分源和读取阶段来同时读取拆分分区绝对是我感兴趣的,所以感谢您为我指明正确的方向.如果您不介意,我有几个小的后续问题:

David your feedback is highly valuable and definitely touches the point i'm interested in. Having a work discovery phase for splitting a source and and read phase to concurrently read the split-partitions is definitely what I was interested in hearing, so thanks for pointing me in the right direction. I have a couple of small follow up questions if you don't mind:

1 - 文章在通用枚举器-读取器通信机制"一节中指出;以下内容:

1 - The article points out under the section "Generic enumerator-reader communication mechanism" the following:

"SplitEnumerator 和 SourceReader 都是用户实现的类.实施需要一些沟通的情况并不少见这两个组件之间.为了促进此类用例[....]"

"The SplitEnumerator and SourceReader are both user implemented class. It is not rare that the implementation require some communication between these two components. In order to facilitate such use cases [....]"

所以我的问题是,分裂 + 阅读行为"是什么?由某些用户(即开发人员)提供的实现(特别是 SplitEnumerator 和 SourceReader)触发,或者我可以在没有任何自定义代码的情况下从开箱即用中受益吗?.

So my question here would be, is that "splitting + reading behaviour" triggered by some user (ie. developer) provided implementation (specifically SplitEnumerator and SourceReader), or can I benefit from that out of the box without any custom code?.

2 - 可能只是深入研究上述问题;如果我有一个批处理/有界工作负载(假设我正在使用 apache flink),并且我有兴趣处理一个巨大的文件";如原始帖子中所述,管道是否开箱即用"?(在幕后进行工作准备阶段"拆分和并行读取),还是需要开发人员实现一些自定义代码?

2 - Probably just delving deeper into the question above; if I have a batch/bounded workload (let's say I'm using apache flink), and I'm interested in processing a "huge file" like described in the original post, will the pipeline work "out of the box" (doing the behind the scenes "work preparation phase" splits and the parallel reads), or would that require some custom code implemented by the developer?

提前感谢您的所有宝贵反馈!

thank's in advance for all your valuable feedback!

推荐答案

只是为了结束这个问题,这个问题的理由是知道 apache beam - 当与生产运行器结合时 -(如 flink 或 spark或谷歌云数据流),提供开箱即用的机制,用于拆分工作,也就是读/写操作——大文件(或一般的数据源).上面 David Anderson 提供的评论在暗示 Apache flink 如何处理此工作流方面具有重要价值.

Just to provide some closure to this question, the justification for this question was to know if apache beam - when coupled with a production runner-(like flink or spark or google cloud dataflow), offered out of the box mechanisms for -splitting work a.k.a reading/writing manipulating - huge files (or datasources in general). The comment provided by David Anderson above proved of great value in hintintg at how Apache flink deals with this workflows.

在这一点上,我已经使用带有beam on flink"的大文件(用于测试可能的 IO 瓶颈)实现了解决方案.基于管道,我可以确认,flink 将创建一个执行计划,其中包括拆分源和以不出现内存问题的方式拆分工作.现在,当然可以有稳定性/IO 性能"的条件.受到损害,但至少我可以确认在管道抽象背后执行的工作流在执行任务时使用文件系统,避免将所有数据放入内存中,从而避免微不足道的内存错误.结论:是的光束在flink";(可能还有 Spark 和数据流)确实提供了适当的工作准备、工作拆分和文件系统使用,以便以有效的方式使用可用的易失性内存.

At this point I've implemented solutions using huge files (for testing possible IO bottlenecks) with a "beam on flink" based pipeline, and I can confirm, that flink will create an excecution plan which includes splitting sources, and dividing work in such a way that no memory problem arises. Now, there can be of course conditions under which stability/"IO performance" is compromised, but at least I can confirm that the workflows carried out behind the pipeline abstraction, uses the filesystem when carriying out tasks, avoiding fitting all data in memory and thus avoiding trivial memory errors. Conclusion: yes "beam on flink" (and likely spark and dataflow too) do offer proper work preparation, work splitting and filesystem usage so that available volatile memory is used in an efficient way.

关于数据源的更新: 将 DB 视为数据源,Flink 不会(并且不能 - 这不是微不足道的)以与优化相同的方式优化/拆分/分发与 DB 数据源相关的工作从文件系统读取.虽然仍然有从数据库读取大量数据(记录)的方法,但实现细节需要由开发人员解决,而不是由框架负责.我找到了这篇文章(https://nl.devoteam.com/expert-view/querying-jdbc-database-in-parallel-with-google-dataflow-apache-beam/)非常有助于解决从 Beam 中的 DB 读取大量记录(文章使用了云数据流运行器,但我使用了 Flink,它工作得很好),拆分查询并分配处理.

Update about datasources: Regarding DBs as datasources, Flink won't (and can't - it is not trivial) optimize/split/distribute work related to DB datasources in the same way it optimizes reading from the filesystem. There are still approaches to read huge amount of data (records) from a DB though, but the implementation details need to be addressed by the developer instead of being responsibility of the framework. I've found this article (https://nl.devoteam.com/expert-view/querying-jdbc-database-in-parallel-with-google-dataflow-apache-beam/) very helpful in addressing the point of reading massive amounts of records from a DB in beam (the article uses a cloud dataflow runner, but I used Flink and it worked just fine), splitting queries and distributing the processing.

这篇关于如何应对(Apache Beam)高IO瓶颈?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆