如何停止返回/收益中的额外重复,同时仍保持给定键的运行总数:值对? [英] How can I stop the extra repetition in the return/yield, while still keeping the running totals for a given key: value pair?

查看:14
本文介绍了如何停止返回/收益中的额外重复,同时仍保持给定键的运行总数:值对?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在将 Pcollection 传递给下一个转换后,转换的回报/收益将成倍增加,而对于给定的街道和事故计数,我只需要一个 KV 对.

After passing the Pcollection to the next transform, the return/yield of the Transform is being multiplied, when I only need a single KV pair for a given street and accident count.

我的理解是生成器可以通过保存值来帮助解决这个问题,但这只能解决我的部分问题.我已经尝试在发送到下一个转换之前确定大小,但我没有找到任何方法可以为我提供所传递的 Pcollection 元素的真实大小.

My understanding is that generators can assist in this, by holding values, but that only solves part of my problem. I've attempted to determine size prior to sending to next transform, but I haven't found any methods that give me true size of the Pcollection elements being passed.

class CountAccidents(beam.DoFn):
    acci_dict = {}

    def process(self, element):
        if self.acci_dict.__contains__(element[0]['STREET_NAME']):
            self.acci_dict[element[0]['STREET_NAME']] += 1
        else:
            self.acci_dict.update({element[0]['STREET_NAME']: 1})
        if self.acci_dict != {}:
            yield self.acci_dict


def run():
    with beam.Pipeline() as pl:
        test = (pl | 'Read' >> beam.io.ReadFromText('/modified_Excel_Crashes_Chicago.csv')
                   | 'Map Accident' >> beam.ParDo(AccidentstoDict())
                   | 'Count Accidents' >> beam.ParDo(CountAccidents())
                   | 'Print to Text' >> beam.io.WriteToText('/letstestthis', file_name_suffix='.txt'))```                                                      

Input Pcollection:
[{'CRASH_DATE': '3/25/19 0:25', 'WEATHER_CONDITION': 'CLEAR', 'STREET_NAME': 'KOSTNER AVE', 'CRASH_HOUR': '0'}]
[{'CRASH_DATE': '3/24/19 23:40', 'WEATHER_CONDITION': 'CLEAR', 'STREET_NAME': 'ARCHER AVE', 'CRASH_HOUR': '23'}]
[{'CRASH_DATE': '3/24/19 23:30', 'WEATHER_CONDITION': 'UNKNOWN', 'STREET_NAME': 'VAN BUREN ST', 'CRASH_HOUR': '23'}]

I expect to get this: 
{'KILPATRICK AVE': 1, 'MILWAUKEE AVE': 1, 'CENTRAL AVE': 2, 'WESTERN AVE': 6, 'DANTE AVE': 1}

What I get is this(a slow build-up till complete): 
{'KOSTNER AVE': 1}
{'KOSTNER AVE': 1, 'ARCHER AVE': 1}
{'KOSTNER AVE': 2, 'ARCHER AVE': 2, 'VAN BUREN ST': 1}

推荐答案

您需要对每个键进行组合,对于 Count 您可以在这里使用:

You will need to do a combine per key, for Count you can make use of the one here:

https://beam.apache.org/releases/pydoc/2.9.0/apache_beam.transforms.combiners.html

在您的读取操作之后,输出一个 KeyValue,它是 {STREET,1},然后是每个键转换的计数,这将为您提供街道的全局计数.

After your read operation, output a KeyValue which is {STREET,1} followed by a Count per key transform which will give you the global count for the street.

例如,如果您想要每周的输出,也可以很容易地添加窗口功能.您只需要将时间戳和窗口添加到调用中.如何做到这一点的例子在这里:

From there it would be easy to also add Windowing functions if you wanted the output per week for example. You will just need to add the timestamp and the window into call. Example of how to do that is here:

在批处理管道中,如何为来自批处理源的数据(例如 Beam 管道中的 csv 文件)分配时间戳

这篇关于如何停止返回/收益中的额外重复,同时仍保持给定键的运行总数:值对?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆