Python Apache Beam Side输入断言错误 [英] Python Apache Beam Side Input Assertion Error
问题描述
我还是Apache Beam/Cloud Dataflow的新手,如果我的理解不正确,我深表歉意.
I am still new to Apache Beam/Cloud Dataflow so I apologize if my understanding is not correct.
我正在尝试通过管道读取大约30,000行的数据文件.我的简单管道首先从GCS打开了csv,从数据中提取了标头,通过ParDo/DoFn函数运行了数据,然后将所有输出写回到csv中回到GCS.该管道有效,这是我的第一个测试.
I am trying to read a data file, ~30,000 rows long, through a pipeline. My simple pipeline first opened the csv from GCS, pulled the headers out of the data, ran the data through a ParDo/DoFn function, and then wrote all of the output into a csv back into GCS. This pipeline worked and was my first test.
然后我编辑管道以读取csv,拉出标题,从数据中删除标题,通过ParDo/DoFn函数以标题作为侧面输入运行数据,然后将所有输出写入CSV.唯一的新代码是将标头作为侧面输入传递并从数据中过滤出来.
I then edited the pipeline to read the csv, pull out the headers, remove the headers from the data, run the data through the ParDo/DoFn function with the headers as a side input, and then write all of the output into a csv. The only new code was passing the headers in as a side input and filtering it from the data.
ParDo/DoFn函数build_rows只是产生context.element,这样我可以确保我的辅助输入正常工作.
The ParDo/DoFn function build_rows just yields the context.element so that I could make sure my side inputs were working.
我得到的错误如下:
我不确定是什么问题,但我认为可能是由于内存限制.我将示例数据从30,000行缩减为100行,我的代码终于可以正常工作了.
The error I get is below:
I am not exactly sure what the issue is but I think it may be due to a memory limit. I trimmed my sample data down from 30,000 rows to 100 rows and my code finally worked.
没有侧面输入的管道确实读取/写入了所有30,000行,但最后我将需要侧面输入对数据进行转换.
The pipeline without the side inputs does read/write all 30,000 rows but in the end I will need the side inputs to do transformations on my data.
如何修复管道,以便可以处理来自GCS的大型csv文件,并且仍将侧面输入用作文件的伪全局变量?
How do I fix my pipeline so that I can process large csv files from GCS and still use side inputs as a pseudo global variable for the file?
推荐答案
我最近编码了 Apache Beam的CSV文件源,我已将其添加到beam_utils
PiPy软件包中.具体来说,您可以按以下方式使用它:
I recently coded a CSV file source for Apache Beam, and I've added it to the beam_utils
PiPy package. Specifically, you can use it as follows:
- 安装Beam utils:
pip install beam_utils
- 导入:
from beam_utils.sources import CsvFileSource
. - 将其用作来源:
beam.io.Read(CsvFileSource(input_file))
.
- Install beam utils:
pip install beam_utils
- Import:
from beam_utils.sources import CsvFileSource
. - Use it as a source:
beam.io.Read(CsvFileSource(input_file))
.
CsvFileSource
的默认行为是返回按标头索引的字典-但您可以查看文档来确定要使用的选项.
In its default behavior, the CsvFileSource
returns dictionaries indexed by header - but you can take a look at the documentation to decide what option you'd like to use.
另外,如果要实现自己的自定义CsvFileSource
,则需要将Beam的FileBasedSource
子类化:
As an extra, if you want to implement your own custom CsvFileSource
, you need to subclass Beam's FileBasedSource
:
import csv
class CsvFileSource(beam.io.filebasedsource.FileBasedSource):
def read_records(self, file_name, range_tracker):
self._file = self.open_file(file_name)
reader = csv.reader(self._file)
for i, rec in enumerate(reader):
yield res
您可以扩展此逻辑以分析标头和其他特殊行为.
And you can expand this logic to parse for headers and other special behavior.
此外,请注意,由于需要顺序解析此源,因此无法拆分,因此在处理数据时它可能表示瓶颈(尽管可以).
Also, as a note, this source can not be split because it needs to be sequentially parsed, so it may represent a bottleneck when processing data (though that may be okay).
这篇关于Python Apache Beam Side输入断言错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!