如何收集光束变换的所有结果? [英] How to collect all results from a beam transform?

查看:25
本文介绍了如何收集光束变换的所有结果?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个光束管道来处理一个相当大的文本文件.管道读取文本,并将行数据提取到字典中.我想将字典写入文本文件,但我只能编写字典键.

I have a beam pipeline to process a rather large text file. The pipeline reads a text, and extracts line data into a dictionary. I want to write the dictionaries into a text file but I am only able to write the dictionary keys.

字典是这样的:

    {'Site_number': '09427500', 'Date': '2019-08-09 10:30:00', 
    'Reservoir_storage': '584900'}

然而,我的result_data.txt文件中写的是:

However, what is written in my result_data.txt file is :

    Site_number
    Date
    Reservoir_storage

这是我尝试处理的文本示例:

Here is a sample of the text I am trying to process:

    # Data provided for site 09427500
#            TS   parameter     Description
#          6385       00054     Reservoir storage, acre feet
#
# Data-value qualification codes included in this output:
#     P  Provisional data subject to revision.
# 
agency_cd   site_no datetime    tz_cd   6385_00054  6385_00054_cd
5s  15s 20d 6s  14n 10s
USGS    09427500    2019-08-09 00:00    MST 580800  P
USGS    09427500    2019-08-09 00:15    MST 581100  P
USGS    09427500    2019-08-09 00:30    MST 581100  P
USGS    09427500    2019-08-09 00:45    MST 581300  P
USGS    09427500    2019-08-09 01:00    MST 581500  P
USGS    09427500    2019-08-09 01:15    MST 581700  P

这是我的代码:

import apache_beam as beam
import pandas as pd
from apache_beam.options.pipeline_options import PipelineOptions
from dateutil import parser
import os

class ExtractData(beam.DoFn):

    def process(self, element):
        line = element[1]
        if not line.startswith('#'):
            dict = {}
            item = line.replace('\n', '').split('\t')
            date_item = item[2]
            try:
                 date = parser.parse(item[2]).strftime('%Y-%m-%d %H:%M:%S')
            except:
                date = date_item
            dict['Site_number'] = item[1]
            dict['Date'] = date
            dict['Reservoir_storage'] = item[-2]
            print(dict)
            return dict


def run():
    output = []
    p = beam.Pipeline('DirectRunner')
    data = ( p
        | 'Read text' >> beam.io.ReadFromTextWithFilename('reservoir_data.txt')
        | 'Process lines' >> beam.ParDo(ExtractData())
        | 'Write' >> beam.io.textio.WriteToText('result_data.txt')
    )

    result = p.run()
    result.wait_until_finish()


if __name__=="__main__":
    run()

最终,我想要一个所有词典的列表.将字典写入文件时我做错了什么/我在这里不明白什么?

Ultimately, I want a list of all the dictionaries. What am I doing wrong when writing the dictionaries to file/what am I not understanding here?

我正在使用 python 3.6

推荐答案

这类似于这个问题测试.基本上,您需要使用 return [dict]yield dict 而不是 return dict 返回一个可迭代对象,它会起作用:

This is similar to this problem and test. Basically, you'll need to return an iterable with either return [dict] or yield dict instead of return dict and it'll work:

$ cat result_data.txt-00000-of-00001 
{'Date': '2019-08-09 00:00:00', 'Reservoir_storage': u'2019-08-09 00:00', 'Site_number': u'09427500'}
{'Date': '2019-08-09 00:15:00', 'Reservoir_storage': u'2019-08-09 00:15', 'Site_number': u'09427500'}
{'Date': '2019-08-09 00:30:00', 'Reservoir_storage': u'2019-08-09 00:30', 'Site_number': u'09427500'}
{'Date': '2019-08-09 00:45:00', 'Reservoir_storage': u'2019-08-09 00:45', 'Site_number': u'09427500'}
{'Date': '2019-08-09 01:00:00', 'Reservoir_storage': u'2019-08-09 01:00', 'Site_number': u'09427500'}
{'Date': '2019-08-09 01:15:00', 'Reservoir_storage': u'2019-08-09 01:15', 'Site_number': u'09427500'}

这篇关于如何收集光束变换的所有结果?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆