如何在收益/收益率中停止多余的重复,同时仍保持给定键:值对的运行总额? [英] How can I stop the extra repetition in the return/yield, while still keeping the running totals for a given key: value pair?

查看:63
本文介绍了如何在收益/收益率中停止多余的重复,同时仍保持给定键:值对的运行总额?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

将Pcollection传递到下一个变换后,对于给定的街道和事故计数,我只需要一个KV对,就可以将变换的收益/收益相乘.

After passing the Pcollection to the next transform, the return/yield of the Transform is being multiplied, when I only need a single KV pair for a given street and accident count.

我的理解是,生成器可以通过保持值来帮助实现这一点,但这只能解决部分问题.我曾尝试确定大小,然后再发送到下一个转换,但是我还没有找到任何能使我真正传递Pcollection元素大小的方法.

My understanding is that generators can assist in this, by holding values, but that only solves part of my problem. I've attempted to determine size prior to sending to next transform, but I haven't found any methods that give me true size of the Pcollection elements being passed.

class CountAccidents(beam.DoFn):
    acci_dict = {}

    def process(self, element):
        if self.acci_dict.__contains__(element[0]['STREET_NAME']):
            self.acci_dict[element[0]['STREET_NAME']] += 1
        else:
            self.acci_dict.update({element[0]['STREET_NAME']: 1})
        if self.acci_dict != {}:
            yield self.acci_dict


def run():
    with beam.Pipeline() as pl:
        test = (pl | 'Read' >> beam.io.ReadFromText('/modified_Excel_Crashes_Chicago.csv')
                   | 'Map Accident' >> beam.ParDo(AccidentstoDict())
                   | 'Count Accidents' >> beam.ParDo(CountAccidents())
                   | 'Print to Text' >> beam.io.WriteToText('/letstestthis', file_name_suffix='.txt'))```                                                      

Input Pcollection:
[{'CRASH_DATE': '3/25/19 0:25', 'WEATHER_CONDITION': 'CLEAR', 'STREET_NAME': 'KOSTNER AVE', 'CRASH_HOUR': '0'}]
[{'CRASH_DATE': '3/24/19 23:40', 'WEATHER_CONDITION': 'CLEAR', 'STREET_NAME': 'ARCHER AVE', 'CRASH_HOUR': '23'}]
[{'CRASH_DATE': '3/24/19 23:30', 'WEATHER_CONDITION': 'UNKNOWN', 'STREET_NAME': 'VAN BUREN ST', 'CRASH_HOUR': '23'}]

I expect to get this: 
{'KILPATRICK AVE': 1, 'MILWAUKEE AVE': 1, 'CENTRAL AVE': 2, 'WESTERN AVE': 6, 'DANTE AVE': 1}

What I get is this(a slow build-up till complete): 
{'KOSTNER AVE': 1}
{'KOSTNER AVE': 1, 'ARCHER AVE': 1}
{'KOSTNER AVE': 2, 'ARCHER AVE': 2, 'VAN BUREN ST': 1}

推荐答案

您将需要对每个键进行合并,对于Count,您可以在此处使用它:

You will need to do a combine per key, for Count you can make use of the one here:

https://beam.apache.org /releases/pydoc/2.9.0/apache_beam.transforms.combiners.html

读取操作后,输出一个{STREET,1}的KeyValue,然后输出每个按键变换的计数,这将为您提供街道的全局计数.

After your read operation, output a KeyValue which is {STREET,1} followed by a Count per key transform which will give you the global count for the street.

例如,如果您希望每周输出一次,则从那里轻松添加Windowing功能.您只需要将时间戳记和窗口添加到调用中.这样做的示例在这里:

From there it would be easy to also add Windowing functions if you wanted the output per week for example. You will just need to add the timestamp and the window into call. Example of how to do that is here:

查看全文

登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆