如何将 csv 转换为 apache 光束数据流中的字典 [英] How to convert csv into a dictionary in apache beam dataflow

查看:30
本文介绍了如何将 csv 转换为 apache 光束数据流中的字典的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想读取一个 csv 文件并使用 apache beam 数据流将其写入 BigQuery.为此,我需要以字典的形式将数据呈现给 BigQuery.为了做到这一点,我如何使用 apache beam 转换数据?

I would like to read a csv file and write it to BigQuery using apache beam dataflow. In order to do this I need to present the data to BigQuery in the form of a dictionary. How can I transform the data using apache beam in order to do this?

我的输入 csv 文件有两列,我想在 BigQuery 中创建一个后续的两列表.我知道如何在 BigQuery 中创建数据,这很简单,我不知道如何将 csv 转换为字典.下面的代码不正确,但应该可以让我了解我正在尝试做什么.

My input csv file has two columns, and I want to create a subsequent two column table in BigQuery. I know how to create data in BigQuery, thats straight forward, what I don't know is how to transform the csv into a dictionary. The below code is not correct but should give an idea of what i'm trying to do.

# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
   beam.io.BigQuerySink(
   output_table,
   schema='month:INTEGER, tornado_count:INTEGER',
   create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
   write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()

推荐答案

从 2.12.0 版开始,Beam 带有新的 fileio 转换,允许您从 CSV 读取而无需重新实现来源.你可以这样做:

as of version 2.12.0, Beam comes with new fileio transforms that allow you to read from CSV without having to reimplement a source. You can do this like so:

def get_csv_reader(readable_file):
  # You can return whichever kind of reader you want here
  # a DictReader, or a normal csv.reader.
  if sys.version_info >= (3, 0):
    return csv.reader(io.TextIOWrapper(readable_file.open()))
  else:
    return csv.reader(readable_file.open())

with Pipeline(...) as p:
  content_pc = (p
                | beam.io.fileio.MatchFiles("/my/file/name")
                | beam.io.fileio.ReadMatches()
                | beam.Reshuffle()  # Useful if you expect many matches
                | beam.FlatMap(get_csv_reader))

我最近为此为 Apache Beam 编写了一个测试.你可以看看 Github 存储库.

I recently wrote a test for this for Apache Beam. You can take a look on the Github repository.

旧答案依赖于重新实现源.这不再是主要推荐的这样做的方式:)

The old answer relied on reimplementing a source. This is no longer the main recommended way of doing this : )

这个想法是有一个返回解析的 CSV 行的源.您可以通过继承 FileBasedSource 类以包含 CSV 解析来实现此目的.特别是,read_records 函数看起来像这样:

The idea is to have a source that returns parsed CSV rows. You can do this by subclassing the FileBasedSource class to include CSV parsing. Particularly, the read_records function would look something like this:

class MyCsvFileSource(apache_beam.io.filebasedsource.FileBasedSource):
  def read_records(self, file_name, range_tracker):
    self._file = self.open_file(file_name)

    reader = csv.reader(self._file)

    for rec in reader:
      yield rec

这篇关于如何将 csv 转换为 apache 光束数据流中的字典的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆