Python Apache Beam 侧输入断言错误 [英] Python Apache Beam Side Input Assertion Error

查看:24
本文介绍了Python Apache Beam 侧输入断言错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我还是 Apache Beam/Cloud Dataflow 的新手,如果我的理解不正确,我深表歉意.

I am still new to Apache Beam/Cloud Dataflow so I apologize if my understanding is not correct.

我正在尝试通过管道读取约 30,000 行长的数据文件.我的简单管道首先从 GCS 打开 csv,从数据中提取标题,通过 ParDo/DoFn 函数运行数据,然后将所有输出写入 csv 回 GCS.这条管道奏效了,这是我的第一次测试.

I am trying to read a data file, ~30,000 rows long, through a pipeline. My simple pipeline first opened the csv from GCS, pulled the headers out of the data, ran the data through a ParDo/DoFn function, and then wrote all of the output into a csv back into GCS. This pipeline worked and was my first test.

然后我编辑管道以读取 csv,拉出标题,从数据中删除标题,通过 ParDo/DoFn 函数以标题作为侧面输入运行数据,然后将所有输出写入一个csv.唯一的新代码是将标头作为侧输入传入并从数据中过滤它.

I then edited the pipeline to read the csv, pull out the headers, remove the headers from the data, run the data through the ParDo/DoFn function with the headers as a side input, and then write all of the output into a csv. The only new code was passing the headers in as a side input and filtering it from the data.

ParDo/DoFn 函数 build_rows 只生成 context.element,以便我可以确保我的辅助输入正常工作.

The ParDo/DoFn function build_rows just yields the context.element so that I could make sure my side inputs were working.

我得到的错误如下:
我不确定问题是什么,但我认为这可能是由于内存限制.我将示例数据从 30,000 行缩减到 100 行,我的代码终于可以运行了.

The error I get is below:
I am not exactly sure what the issue is but I think it may be due to a memory limit. I trimmed my sample data down from 30,000 rows to 100 rows and my code finally worked.

没有侧输入的管道确实读/写了所有 30,000 行,但最后我需要侧输入来对我的数据进行转换.

The pipeline without the side inputs does read/write all 30,000 rows but in the end I will need the side inputs to do transformations on my data.

如何修复我的管道,以便我可以处理来自 GCS 的大型 csv 文件,并且仍然使用侧输入作为文件的伪全局变量?

How do I fix my pipeline so that I can process large csv files from GCS and still use side inputs as a pseudo global variable for the file?

推荐答案

我最近编写了一个 用于 Apache Beam 的 CSV 文件源,我已将其添加到 beam_utils PiPy 包中.具体可以如下使用:

I recently coded a CSV file source for Apache Beam, and I've added it to the beam_utils PiPy package. Specifically, you can use it as follows:

  1. 安装beam utils:pip install beam_utils
  2. 导入:from beam_utils.sources import CsvFileSource.
  3. 将其用作源:beam.io.Read(CsvFileSource(input_file)).

在其默认行为中,CsvFileSource 返回按标题索引的字典 - 但您可以查看文档来决定要使用的选项.

In its default behavior, the CsvFileSource returns dictionaries indexed by header - but you can take a look at the documentation to decide what option you'd like to use.

另外,如果你想实现你自己的自定义CsvFileSource,你需要继承Beam的FileBasedSource:

As an extra, if you want to implement your own custom CsvFileSource, you need to subclass Beam's FileBasedSource:

import csv
class CsvFileSource(beam.io.filebasedsource.FileBasedSource):
  def read_records(self, file_name, range_tracker):
    self._file = self.open_file(file_name)
    reader = csv.reader(self._file)
    for i, rec in enumerate(reader):
      yield res

您可以扩展此逻辑以解析标题和其他特殊行为.

And you can expand this logic to parse for headers and other special behavior.

另外,请注意,这个源不能拆分,因为它需要顺序解析,所以它可能代表处理数据时的瓶颈(尽管这可能没问题).

Also, as a note, this source can not be split because it needs to be sequentially parsed, so it may represent a bottleneck when processing data (though that may be okay).

这篇关于Python Apache Beam 侧输入断言错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆