数据流:将Top模块与Python SDK结合使用:单元素PCollection [英] Dataflow: Using Top module with Python SDK: single-element PCollection

查看:136
本文介绍了数据流:将Top模块与Python SDK结合使用:单元素PCollection的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在查看孵化器光束存储库上的word_counting.py示例(与Dataflow文档链接),我想对其进行修改以使出现次数最多的 n .这是我的管道:

I was looking at the word_counting.py example on the incubator-beam repository (linked from the Dataflow documentation), and I want to modifiy it to get the n with the most occurrences. Here is my pipeline:

  counts = (lines
        | 'split' >> (beam.ParDo(WordExtractingDoFn())
                      .with_output_types(unicode))
        | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
        | 'group' >> beam.GroupByKey()
        | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
        | 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c) # 'top' is the only added line

  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))

我使用Top.Of()方法添加了一行,但似乎返回了一个 PCollection,其中数组作为单个元素(我在等待有序的PCollection,但看起来似乎在看文档PCollections是无序集合.

I added a line using the Top.Of() method, but it seems it returns a PCollection with an array as single element (I was waiting an ordered PCollection but looking at the doc it seems that PCollections are unordered collections.

当管道运行时,beam.Map仅在一个元素(即整个数组)上循环,并以格式"进行循环,因此lambda函数引发错误,因为它无法将整个数组映射到元组(word,c )

When the pipeline runs, beam.Map loop over only one element (which is the entire array) and in 'format', the lambda function raise an error, since it cannot map the entire array into the tuple (word,c)

如何在不中断管道的情况下处理此单元素PCollection?

How should I handle this single-element PCollection without interrupting the pipeline at this step ?

推荐答案

如果要将可迭代变量的PCollection扩展为这些可迭代变量的元素的PCollection,则可以使用FlatMap,其参数为一个从元素到可迭代结果的函数:在您的情况下,元素本身就是可迭代的,因此我们使用恒等函数.

If you want to expand a PCollection of iterables into a PCollection of the elements of these iterables, you can use FlatMap, whose argument is a function from elements to an iterable of results: in your case, the elements are iterables themselves, so we use the identity function.

  counts = ...
        | 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c)
        | 'expand' >> beam.FlatMap(lambda word_counts: word_counts) # sic!

  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
  ...

这篇关于数据流:将Top模块与Python SDK结合使用:单元素PCollection的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆