使用python的Apache Beam中PCollection内几个字段的最大和最小 [英] Max and Min for several fields inside PCollection in apache beam with python

查看:85
本文介绍了使用python的Apache Beam中PCollection内几个字段的最大和最小的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在通过python SDK使用apache Beam,并遇到以下问题:

I am using apache beam via python SDK and have the following problem:

我有一个带有大约100万个条目的PCollection,一个PCollection中的每个条目看起来都像一个2元组[(key1,value1),(key2,value2),...]的列表,长度约为150.我需要在每个键的PCollection的所有条目中找到最大值和最小值,以便标准化这些值.

I have a PCollection with approximately 1 mln entries, each entry in a PCollection looks like a list of 2-tuples [(key1,value1),(key2,value2),...] with length ~150. I need to find max and min values across all entries of the PCollection for each key in order normalize the values.

理想情况下,使用元组列表[(key,max_value,min_value),...]来获取PCollection会很好,然后可以很容易地进行规范化以获取 [(key1,norm_value1),(key2,norm_value2),...],其中norm_value = (value - min) / (max - min)

Ideally, it will be good to obtain PCollection with a list of tuples [(key,max_value,min_value),...] and then it will be easy to proceed with normalization to get [(key1,norm_value1),(key2,norm_value2),...], where norm_value = (value - min) / (max - min)

目前,我只能手动对每个键分别进行操作,这不是很方便也不可持续,因此任何建议都将有所帮助.

At the moment I can do it only separately for each key by hands, which is not very convenient and not sustainable, so any suggestions will be helpful.

推荐答案

我决定使用自定义的

I decided to give it a go using a custom CombineFn function to determine the minimum and maximum per each key. Then, join them with the input data using CoGroupByKey and apply the desired mapping to normalize the values.

"""Normalize PCollection values."""

import logging
import argparse
import sys

import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


# custom CombineFn that outputs min and max value
class MinMaxFn(beam.CombineFn):
  # initialize min and max values (I assumed int type)
  def create_accumulator(self):
    return (sys.maxint, 0)

  # update if current value is a new min or max      
  def add_input(self, min_max, input):
    (current_min, current_max) = min_max
    return min(current_min, input), max(current_max, input)

  def merge_accumulators(self, accumulators):
    return accumulators

  def extract_output(self, min_max):
    return min_max


def run(argv=None):
  """Main entry point; defines and runs the pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument('--output',
                      dest='output',
                      required=True,
                      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  p = beam.Pipeline(options=pipeline_options)

  # create test data
  pc = [('foo', 1), ('bar', 5), ('foo', 5), ('bar', 9), ('bar', 2)]

  # first run through data to apply custom combineFn and determine min/max per key
  minmax = pc | 'Determine Min Max' >> beam.CombinePerKey(MinMaxFn())

  # group input data by key and append corresponding min and max 
  merged = (pc, minmax) | 'Join Pcollections' >> beam.CoGroupByKey()

  # apply mapping to normalize values according to 'norm_value = (value - min) / (max - min)'
  normalized = merged | 'Normalize values' >> beam.Map(lambda (a, (b, c)): (a, [float(val - c[0][0][0])/(c[0][0][1] -c[0][0][0]) for val in b]))

  # write results to output file
  normalized | 'Write results' >> WriteToText(known_args.output)

  result = p.run()
  result.wait_until_finish()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

该片段可以与python SCRIPT_NAME.py --output OUTPUT_FILENAME一起运行.我的测试数据按键分组是:

The snippet can be run with python SCRIPT_NAME.py --output OUTPUT_FILENAME. My test data, grouped by key, is:

('foo', [1, 5])
('bar', [5, 9, 2])

CombineFn将按键的最大和最小值返回:

The CombineFn will return per key min and max:

('foo', [(1, 5)])
('bar', [(2, 9)])

通过键操作的join/cogroup的输出:

The output of the join/cogroup by key operation:

('foo', ([1, 5], [[(1, 5)]]))
('bar', ([5, 9, 2], [[(2, 9)]]))

归一化后:

('foo', [0.0, 1.0])
('bar', [0.42857142857142855, 1.0, 0.0])

这只是一个简单的测试,因此我确定可以针对上述数据量对其进行优化,但它似乎可以作为起点.考虑到可能需要进一步考虑(即,如果min = max,请避免除以零)

This was just a simple test so I’m sure it can be optimized for the mentioned volume of data but it seems to work as a starting point. Take into account that further considerations might be needed (i.e. avoid dividing by zero if min = max)

这篇关于使用python的Apache Beam中PCollection内几个字段的最大和最小的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆