如何将csv转换成apache beam数据流中的字典 [英] How to convert csv into a dictionary in apache beam dataflow

查看:206
本文介绍了如何将csv转换成apache beam数据流中的字典的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想读取一个csv文件并使用apache beam数据流将其写入BigQuery。为了做到这一点,我需要以字典的形式将数据呈现给BigQuery。我怎样才能转换数据使用Apache梁为了做到这一点?

我的输入csv文件有两列,我想在BigQuery中创建一个后续的两列表。我知道如何在BigQuery中创建数据,这是直接的,我不知道如何将csv转换为字典。下面的代码是不正确的,但应该知道我在做什么。

 #标准进口
将apache_beam导入为beam
#创建一个在直接运行器(本地,非云)上执行的管道。
p = beam.Pipeline('DirectPipelineRunner')
#使用名称创建一个PCollection并将其写入文件。
(p
|'read solar data'>> beam.Read(beam.io.TextFileSource('./ sensor1_121116.csv'))
#你如何做到这一点? ?
|'convert to dictionary'>> beam.Map(lambda(k,v):{'luminosity':k,'datetime':v})
|'save'> > beam.Write(
beam.io.BigQuerySink(
output_table,
schema ='month:INTEGER,tornado_count:INTEGER',
create_disposition = beam.io.BigQueryDisposition。 CREATE_IF_NEEDED,
write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()


解决方案

这个想法是有一个返回解析的CSV行的源代码。您可以通过继承 FileBasedSource 类来包含CSV解析来完成此操作。特别是, read_records 函数看起来像这样:

  class MyCsvFileSource (apache_beam.io.filebasedsource.FileBasedSource):
def read_records(self,file_name,range_tracker):
self._file = self.open_file(file_name)

reader = csv。 reader(self._file)

for rec in reader:
yield rec

我最近为Apache Beam编写了一个 CsvFileSource 。您可以查看 Github存储库。您可以使用beam_utils.sources中的 pip install beam_utils 导入CsvFileSource 来使用它。 CsvFileSource 还包括设置自定义分隔符,跳过文件标题和/或输出字典而不是列表的选项。


I would like to read a csv file and write it to BigQuery using apache beam dataflow. In order to do this I need to present the data to BigQuery in the form of a dictionary. How can I transform the data using apache beam in order to do this?

My input csv file has two columns, and I want to create a subsequent two column table in BigQuery. I know how to create data in BigQuery, thats straight forward, what I don't know is how to transform the csv into a dictionary. The below code is not correct but should give an idea of what i'm trying to do.

# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
   beam.io.BigQuerySink(
   output_table,
   schema='month:INTEGER, tornado_count:INTEGER',
   create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
   write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()

解决方案

The idea is to have a source that returns parsed CSV rows. You can do this by subclassing the FileBasedSource class to include CSV parsing. Particularly, the read_records function would look something like this:

class MyCsvFileSource(apache_beam.io.filebasedsource.FileBasedSource):
  def read_records(self, file_name, range_tracker):
    self._file = self.open_file(file_name)

    reader = csv.reader(self._file)

    for rec in reader:
      yield rec

I recently wrote a CsvFileSource for Apache Beam. You can take a look on the Github repository. You can use pip install beam_utils, and from beam_utils.sources import CsvFileSource to use it. CsvFileSource also includes options to set a custom delimiter, skip the file header, and/or output dictionaries instead of lists.

这篇关于如何将csv转换成apache beam数据流中的字典的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆