为什么组合函数被调用三遍? [英] why is the combine function called three times?

查看:58
本文介绍了为什么组合函数被调用三遍?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想了解apache光束管道中的组合变压器.

I'm trying to understand the combine transformer in a apache beam pipeline.

考虑以下示例管道:

def test_combine(data):
    logging.info('test combine')
    logging.info(type(data))
    logging.info(data)
    return [1, 2, 3]


def run():
    logging.info('start pipeline')
    pipeline_options = PipelineOptions(
        None, streaming=True, save_main_session=True,
    )
    p = beam.Pipeline(options=pipeline_options)

    data = p | beam.Create([
        {'id': '1', 'ts': datetime.datetime.utcnow()},
        {'id': '2', 'ts': datetime.datetime.utcnow()},
        {'id': '3', 'ts': datetime.datetime.utcnow()}
    ])

    purchase_paths = (
        data
        | WindowInto(FixedWindows(10))
        | beam.CombineGlobally(test_combine).without_defaults()
    )

    result = p.run()
    result.wait_until_finish()
    logging.info('end pipeline')


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

生成以下日志输出:

INFO:root:test combine
INFO:root:<class 'list'>
INFO:root:[{'id': '1', 'ts': datetime.datetime(2020, 8, 3, 19, 22, 53, 193363)}, {'id': '2', 'ts': datetime.datetime(2020, 8, 3, 19, 22, 53, 193366)}, {'id': '3', 'ts': datetime.datetime(2020, 8, 3, 19, 22, 53, 193367)}]
INFO:root:test combine
INFO:root:<class 'apache_beam.transforms.core._ReiterableChain'>
INFO:root:<apache_beam.transforms.core._ReiterableChain object at 0x1210faf50>
INFO:root:test combine
INFO:root:<class 'list'>
INFO:root:[[1, 2, 3]]
INFO:root:end pipeline

为什么combine函数被调用三次,并且每次接收到不同的输入时都会被接收?在上一次调用中,它似乎收到了自己的返回值作为输入.

Why is the combine function called three times and receives every time a different input? In the last call it seems to receive the own return value as input.

更新

我对组合器的理解不正确.在文档中说:

I had a wrong understanding from the combiner. In the documentation is says:

合并函数应该是可交换的和关联的,因为函数不一定必须在所有值上仅用一次调用一次给定密钥

The combining function should be commutative and associative, as the function is not necessarily invoked exactly once on all values with a given key

实际上,组合器的输出可以再次用作组合器的输入,以与pcollection的以下各项进行汇总.因此,组合器的输出必须与组合器的输入具有相同的格式.

Indeed the output of the combiner can be used again as input for the combiner to aggregate with the following items of the pcollection. Thus the output of the combiner needs to be in the same format as the input of the combiner.

正如Inigo所指出的那样,我需要在pcollection中设置时间戳记值,以便窗口正常工作.

Also as Inigo pointed out I needed to set the timestamp value in the pcollection so that the windowing works properly.

这是更新的示例:

combine_count = 0
   

def test_combine(data):
    global combine_count
    combine_count += 1
    logging.info(f'test combine: {combine_count}')
    logging.info(f'input: {list(data)}')
    combined_id = '+'.join([d['id'] for d in data])
    combined_ts = max([d['ts'] for d in data])
    combined = {'id': combined_id, 'ts': combined_ts}
    logging.info(f'output: {combined}')
    return combined


def run():
    logging.info('start pipeline')
    pipeline_options = PipelineOptions(
        None, streaming=True, save_main_session=True,
    )
    p = beam.Pipeline(options=pipeline_options)

    ts = int(time.time())

    data = p | beam.Create([
        {'id': '1', 'ts': ts},
        {'id': '2', 'ts': ts + 5},
        {'id': '3', 'ts': ts + 12}
    ])

    purchase_paths = (
        data
        | 'With timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['ts']))
        | WindowInto(FixedWindows(10))
        | beam.CombineGlobally(test_combine).without_defaults()
    )

    result = p.run()
    result.wait_until_finish()
    logging.info('end pipeline')


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

此示例的输出如下:

INFO:root:test combine: 1
INFO:root:input: [{'id': '2', 'ts': 1596791192}, {'id': '3', 'ts': 1596791199}]
INFO:root:output: {'id': '2+3', 'ts': 1596791199}
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((CombineGlobally(test_combine)/CombinePerKey/Group/Read)+(CombineGlobally(test_combine)/CombinePerKey/Merge))+(CombineGlobally(test_combine)/CombinePerKey/ExtractOutputs))+(ref_AppliedPTransform_CombineGlobally(test_combine)/UnKey_28)
INFO:root:test combine: 2
INFO:root:input: [{'id': '1', 'ts': 1596791187}]
INFO:root:output: {'id': '1', 'ts': 1596791187}
INFO:root:test combine: 3
INFO:root:input: [{'id': '1', 'ts': 1596791187}]
INFO:root:output: {'id': '1', 'ts': 1596791187}
INFO:root:test combine: 4
INFO:root:input: [{'id': '2+3', 'ts': 1596791199}]
INFO:root:output: {'id': '2+3', 'ts': 1596791199}
INFO:root:test combine: 5
INFO:root:input: [{'id': '2+3', 'ts': 1596791199}]
INFO:root:output: {'id': '2+3', 'ts': 1596791199}
INFO:root:end pipeline

我仍然不完全理解为什么多次调用合并器.但是根据文档可能会发生这种情况.

I still don't fully understand why the combiner is called that many times. But according to the documentation this may happen.

推荐答案

它似乎是由于MapReduce结构而发生的.使用组合器时,将一个组合器的输出用作输入.

It looks it's happening due to the MapReduce structure. When using Combiners, the output that one combiner has is used as a input.

例如,假设将3个数字(1、2、3)相加.组合器可以将第一个数字1和第二个数字(3)相加,然后将该数字与3(3 + 3 = 6)用作输入.在您的情况下, [1、2、3] 似乎用作下一个组合器的输入.

As an example, imagine summing 3 numbers (1, 2, 3). The combiner MAY sum first 1 and 2 (3) and use that number as input with 3 (3 + 3 = 6). In your case [1, 2, 3] seems to be used as an input in the next combiner.

一个真正帮助我了解这一点的例子:

An example that really helped me understand this:

p = beam.Pipeline()

def make_list(elements):
    print(elements)
    return elements

(p | Create(range(30))
   | beam.core.CombineGlobally(make_list))

p.run()

请注意,下一个组合器中将使用元素 [1,..,10] .

See that the element [1,..,10] is used in the next combiner.

这篇关于为什么组合函数被调用三遍?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆