为什么组合函数被调用了三次? [英] why is the combine function called three times?

查看:24
本文介绍了为什么组合函数被调用了三次?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试了解 apache 光束管道中的组合变压器.

I'm trying to understand the combine transformer in a apache beam pipeline.

考虑以下示例管道:

def test_combine(data):
    logging.info('test combine')
    logging.info(type(data))
    logging.info(data)
    return [1, 2, 3]


def run():
    logging.info('start pipeline')
    pipeline_options = PipelineOptions(
        None, streaming=True, save_main_session=True,
    )
    p = beam.Pipeline(options=pipeline_options)

    data = p | beam.Create([
        {'id': '1', 'ts': datetime.datetime.utcnow()},
        {'id': '2', 'ts': datetime.datetime.utcnow()},
        {'id': '3', 'ts': datetime.datetime.utcnow()}
    ])

    purchase_paths = (
        data
        | WindowInto(FixedWindows(10))
        | beam.CombineGlobally(test_combine).without_defaults()
    )

    result = p.run()
    result.wait_until_finish()
    logging.info('end pipeline')


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

生成以下日志输出:

INFO:root:test combine
INFO:root:<class 'list'>
INFO:root:[{'id': '1', 'ts': datetime.datetime(2020, 8, 3, 19, 22, 53, 193363)}, {'id': '2', 'ts': datetime.datetime(2020, 8, 3, 19, 22, 53, 193366)}, {'id': '3', 'ts': datetime.datetime(2020, 8, 3, 19, 22, 53, 193367)}]
INFO:root:test combine
INFO:root:<class 'apache_beam.transforms.core._ReiterableChain'>
INFO:root:<apache_beam.transforms.core._ReiterableChain object at 0x1210faf50>
INFO:root:test combine
INFO:root:<class 'list'>
INFO:root:[[1, 2, 3]]
INFO:root:end pipeline

为什么 combine 函数被调用了 3 次并且每次都接收到不同的输入?在最后一次调用中,它似乎接收了自己的返回值作为输入.

Why is the combine function called three times and receives every time a different input? In the last call it seems to receive the own return value as input.

更新

我对组合器的理解有误.在文档中说:

I had a wrong understanding from the combiner. In the documentation is says:

组合函数应该是可交换和结合的,因为函数不一定在所有值上只调用一次给定密钥

The combining function should be commutative and associative, as the function is not necessarily invoked exactly once on all values with a given key

实际上,组合器的输出可以再次用作组合器的输入,以便与 pcollection 的以下项目聚合.因此组合器的输出需要与组合器的输入格式相同.

Indeed the output of the combiner can be used again as input for the combiner to aggregate with the following items of the pcollection. Thus the output of the combiner needs to be in the same format as the input of the combiner.

此外,正如 Inigo 指出的那样,我需要在 pcollection 中设置时间戳值,以便窗口正常工作.

Also as Inigo pointed out I needed to set the timestamp value in the pcollection so that the windowing works properly.

这是更新的示例:

combine_count = 0
   

def test_combine(data):
    global combine_count
    combine_count += 1
    logging.info(f'test combine: {combine_count}')
    logging.info(f'input: {list(data)}')
    combined_id = '+'.join([d['id'] for d in data])
    combined_ts = max([d['ts'] for d in data])
    combined = {'id': combined_id, 'ts': combined_ts}
    logging.info(f'output: {combined}')
    return combined


def run():
    logging.info('start pipeline')
    pipeline_options = PipelineOptions(
        None, streaming=True, save_main_session=True,
    )
    p = beam.Pipeline(options=pipeline_options)

    ts = int(time.time())

    data = p | beam.Create([
        {'id': '1', 'ts': ts},
        {'id': '2', 'ts': ts + 5},
        {'id': '3', 'ts': ts + 12}
    ])

    purchase_paths = (
        data
        | 'With timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['ts']))
        | WindowInto(FixedWindows(10))
        | beam.CombineGlobally(test_combine).without_defaults()
    )

    result = p.run()
    result.wait_until_finish()
    logging.info('end pipeline')


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

这个例子的输出看起来像这样:

The output of this example looks like this:

INFO:root:test combine: 1
INFO:root:input: [{'id': '2', 'ts': 1596791192}, {'id': '3', 'ts': 1596791199}]
INFO:root:output: {'id': '2+3', 'ts': 1596791199}
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((CombineGlobally(test_combine)/CombinePerKey/Group/Read)+(CombineGlobally(test_combine)/CombinePerKey/Merge))+(CombineGlobally(test_combine)/CombinePerKey/ExtractOutputs))+(ref_AppliedPTransform_CombineGlobally(test_combine)/UnKey_28)
INFO:root:test combine: 2
INFO:root:input: [{'id': '1', 'ts': 1596791187}]
INFO:root:output: {'id': '1', 'ts': 1596791187}
INFO:root:test combine: 3
INFO:root:input: [{'id': '1', 'ts': 1596791187}]
INFO:root:output: {'id': '1', 'ts': 1596791187}
INFO:root:test combine: 4
INFO:root:input: [{'id': '2+3', 'ts': 1596791199}]
INFO:root:output: {'id': '2+3', 'ts': 1596791199}
INFO:root:test combine: 5
INFO:root:input: [{'id': '2+3', 'ts': 1596791199}]
INFO:root:output: {'id': '2+3', 'ts': 1596791199}
INFO:root:end pipeline

我仍然不完全理解为什么组合器被多次调用.但根据文档,这可能会发生.

I still don't fully understand why the combiner is called that many times. But according to the documentation this may happen.

推荐答案

看起来这是由于 MapReduce 结构造成的.使用组合器时,将一个组合器的输出用作输入.

It looks it's happening due to the MapReduce structure. When using Combiners, the output that one combiner has is used as a input.

例如,假设对 3 个数字 (1, 2, 3) 求和.组合器可以先将 1 和 2 (3) 相加,然后将该数字用作 3 (3 + 3 = 6) 的输入.在你的情况下 [1, 2, 3] 似乎被用作下一个组合器的输入.

As an example, imagine summing 3 numbers (1, 2, 3). The combiner MAY sum first 1 and 2 (3) and use that number as input with 3 (3 + 3 = 6). In your case [1, 2, 3] seems to be used as an input in the next combiner.

一个真正帮助我理解这一点的例子:

An example that really helped me understand this:

p = beam.Pipeline()

def make_list(elements):
    print(elements)
    return elements

(p | Create(range(30))
   | beam.core.CombineGlobally(make_list))

p.run()

看到元素 [1,..,10] 用于下一个组合器.

See that the element [1,..,10] is used in the next combiner.

这篇关于为什么组合函数被调用了三次?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆