如何使用 Beam 读取大型 CSV? [英] How to read large CSV with Beam?

查看:28
本文介绍了如何使用 Beam 读取大型 CSV?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究如何使用 Apache Beam 读取大型 CSV 文件.我所说的大"是指几 GB(因此一次将整个 CSV 读入内存是不切实际的).

I'm trying to figure out how to use Apache Beam to read large CSV files. By "large" I mean, several gigabytes (so that it would be impractical to read the entire CSV into memory at once).

到目前为止,我已经尝试了以下选项:

So far, I've tried the following options:

  • 使用 TextIO.read():这不好,因为引用的 CSV 字段可能包含换行符.此外,这会尝试一次将整个文件读入内存.
  • 编写一个 DoFn,将文件作为流读取并发出记录(例如使用 commons-csv).但是,这仍然会一次读取整个文件.
  • 尝试 SplittableDoFn 如此处所述.我的目标是让它逐渐将记录作为无界 PCollection 发出 - 基本上,将我的文件变成记录流.但是,(1) 很难正确计算 (2) 由于 ParDo 创建了多个线程,因此需要一些复杂的同步,并且 (3) 我生成的 PCollection 仍然不是无限的.
  • 尝试创建我自己的 UnboundedSource.这似乎非常复杂且记录不足(除非我遗漏了什么?).
  • Use TextIO.read(): this is no good because a quoted CSV field could contain a newline. In addition, this tries to read the entire file into memory at once.
  • Write a DoFn that reads the file as a stream and emits records (e.g. with commons-csv). However, this still reads the entire file all at once.
  • Try a SplittableDoFn as described here. My goal with this is to have it gradually emit records as an Unbounded PCollection - basically, to turn my file into a stream of records. However, (1) it's hard to get the counting right (2) it requires some hacky synchronizing since ParDo creates multiple threads, and (3) my resulting PCollection still isn't unbounded.
  • Try to create my own UnboundedSource. This seems to be ultra-complicated and poorly documented (unless I'm missing something?).

Beam 是否提供任何简单的方法来让我以我想要的方式解析文件,而不必在继续下一个转换之前将整个文件读入内存?

Does Beam provide anything simple to allow me to parse a file the way I want, and not have to read the entire file into memory before moving on to the next transform?

推荐答案

从 Beam 的角度来看,TextIO 应该做正确的事情,即尽可能快地读取文本文件并将事件发送到下一阶段.

The TextIO should be doing the right thing from Beam's prospective, which is reading in the text file as fast as possible and emitting events to the next stage.

>

我猜您正在为此使用 DirectRunner,这就是您看到大量内存占用的原因.希望这不是太多的解释:DirectRunner 是一个用于小型作业的测试运行器,因此它将中间步骤缓冲在内存中而不是磁盘中.如果您仍在测试您的管道,您应该使用一小部分数据样本,直到您认为它正在工作.然后您可以使用 Apache Flink runner 或 Google Cloud Dataflow runner,它们都会在需要时将中间阶段写入磁盘.

I'm guessing you are using the DirectRunner for this, which is why you are seeing a large memory footprint. Hopefully this isn't too much explanation: The DirectRunner is a test runner for small jobs and so it buffers intermediate steps in memory rather then to disk. If you are still testing your pipeline, you should use a small sample of your data until you think it is working. Then you can use the Apache Flink runner or Google Cloud Dataflow runner which will both write intermediate stages to disk when needed.

这篇关于如何使用 Beam 读取大型 CSV?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆