如何收集光束转换的所有结果? [英] How to collect all results from a beam transform?
问题描述
我有一个光束管道来处理一个相当大的文本文件.管道读取文本,并将行数据提取到字典中.我想将字典写到文本文件中,但是我只能写字典键.
I have a beam pipeline to process a rather large text file. The pipeline reads a text, and extracts line data into a dictionary. I want to write the dictionaries into a text file but I am only able to write the dictionary keys.
字典看起来像这样:
{'Site_number': '09427500', 'Date': '2019-08-09 10:30:00',
'Reservoir_storage': '584900'}
但是,在result_data.txt文件中写的是:
However, what is written in my result_data.txt file is :
Site_number
Date
Reservoir_storage
以下是我要处理的文本示例:
Here is a sample of the text I am trying to process:
# Data provided for site 09427500
# TS parameter Description
# 6385 00054 Reservoir storage, acre feet
#
# Data-value qualification codes included in this output:
# P Provisional data subject to revision.
#
agency_cd site_no datetime tz_cd 6385_00054 6385_00054_cd
5s 15s 20d 6s 14n 10s
USGS 09427500 2019-08-09 00:00 MST 580800 P
USGS 09427500 2019-08-09 00:15 MST 581100 P
USGS 09427500 2019-08-09 00:30 MST 581100 P
USGS 09427500 2019-08-09 00:45 MST 581300 P
USGS 09427500 2019-08-09 01:00 MST 581500 P
USGS 09427500 2019-08-09 01:15 MST 581700 P
这是我的代码:
import apache_beam as beam
import pandas as pd
from apache_beam.options.pipeline_options import PipelineOptions
from dateutil import parser
import os
class ExtractData(beam.DoFn):
def process(self, element):
line = element[1]
if not line.startswith('#'):
dict = {}
item = line.replace('\n', '').split('\t')
date_item = item[2]
try:
date = parser.parse(item[2]).strftime('%Y-%m-%d %H:%M:%S')
except:
date = date_item
dict['Site_number'] = item[1]
dict['Date'] = date
dict['Reservoir_storage'] = item[-2]
print(dict)
return dict
def run():
output = []
p = beam.Pipeline('DirectRunner')
data = ( p
| 'Read text' >> beam.io.ReadFromTextWithFilename('reservoir_data.txt')
| 'Process lines' >> beam.ParDo(ExtractData())
| 'Write' >> beam.io.textio.WriteToText('result_data.txt')
)
result = p.run()
result.wait_until_finish()
if __name__=="__main__":
run()
最终,我想要所有字典的列表.将字典写到文件时我在做什么错/我在这里不明白什么?
Ultimately, I want a list of all the dictionaries. What am I doing wrong when writing the dictionaries to file/what am I not understanding here?
我正在使用 python 3.6
推荐答案
这类似于此问题和 return [dict]
或 yield dict
而不是 return dict
返回一个可迭代的对象,并且可以使用:
This is similar to this problem and test. Basically, you'll need to return an iterable with either return [dict]
or yield dict
instead of return dict
and it'll work:
$ cat result_data.txt-00000-of-00001
{'Date': '2019-08-09 00:00:00', 'Reservoir_storage': u'2019-08-09 00:00', 'Site_number': u'09427500'}
{'Date': '2019-08-09 00:15:00', 'Reservoir_storage': u'2019-08-09 00:15', 'Site_number': u'09427500'}
{'Date': '2019-08-09 00:30:00', 'Reservoir_storage': u'2019-08-09 00:30', 'Site_number': u'09427500'}
{'Date': '2019-08-09 00:45:00', 'Reservoir_storage': u'2019-08-09 00:45', 'Site_number': u'09427500'}
{'Date': '2019-08-09 01:00:00', 'Reservoir_storage': u'2019-08-09 01:00', 'Site_number': u'09427500'}
{'Date': '2019-08-09 01:15:00', 'Reservoir_storage': u'2019-08-09 01:15', 'Site_number': u'09427500'}
这篇关于如何收集光束转换的所有结果?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!