如何计算每个窗口的元素 [英] How to count elements per window

查看:65
本文介绍了如何计算每个窗口的元素的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试解决看似简单的问题-计算每个窗口的PCollection中有多少个元素.我需要它在写入时传递给.withSharding()函数,以创建与将要写入的文件一样多的分片.

I'm trying to solve what seems to be easy problem -- count how many elements there are in a PCollection per window. I need it to pass to .withSharding() function on write, to create as many shards as there are going to be files to write.

我试图做:

FileIO.writeDynamic<Long, E>()
    .withDestinationCoder(AvroCoder.of(Long::class.java))
    .by { e -> e.key }
    .via(Contextful.fn(MySerFunction()))
    .withNaming({ key -> MyFileNaming() })
    .withSharding(ShardingFn())
    .to("gs://some-output")

class ShardingFn : PTransform<PCollection<E>>, PCollectionView<Int>>() {
    override fun expand(input: PCollection<E>): PCollectionView<Int> {

        val keys: PCollection<Long> = input.apply(Keys.create())

        // This only works with GlobalWindowing, how to count per window?
        val count: PCollection<Long> = keys.apply(Count.globally())

        val int: PCollection<Int> = count.apply(MapElements.via(Long2Int))
        return int.apply(View.asSingleton())
    }

但是,仅当我具有全局窗口(也称为批处理模式")时,此方法才起作用,否则Count.globally()将引发异常.

However, this works only as long as I have global windowing (aka "batch mode"), otherwise Count.globally() will throw an exception.

也许我写错了,但是如果由于其他原因我想对每个窗口的元素进行计数,该怎么做呢?

Maybe I'm doing it wrong for writing, but if I ever want to count elements per window for some other reason, how to do that?

推荐答案

在您的情况下,可以使用Combine.globally(Count.<T>combineFn()).withoutDefaults()代替Count.globally().这也可以在Javadoc中找到:

Using Combine.globally(Count.<T>combineFn()).withoutDefaults() instead of Count.globally() should work in your case. This can also be found in the Javadoc: https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/transforms/Count.html#globally--

这篇关于如何计算每个窗口的元素的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆