如何仅在窗口完成时输出窗口聚合的结果? [英] How to output result of windowed aggregation only when window is finished?

查看:18
本文介绍了如何仅在窗口完成时输出窗口聚合的结果?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 KStream,我想在其中计算事件的某个维度.我是这样做的:

I have a KStream in which I want to count some dimension of the events. I do it as follows:

KTable<Windowed<Long>, Counter> ret = input.groupByKey()
  .windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
  .aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()));

我想要一个新的 KStream 将这些聚合作为事件.我可以像这样轻松做到:

I want to have a new KStream with those aggregations as events. I can do it easily like this:

ret.toStream().to("output");

问题是输入"主题中的每个事件都会产生一个输出"主题的事件.我想仅在窗口完成时将事件发布到输出主题.例如,如果窗口为一分钟,则每分钟为每个键发送一个事件.

The problem is that every event in "input" topic will produce an event to "output" topic. I would like to publish an event to the output topic only when a window is finished. For example if the window is of one minute, send a single event per key per minute.

我想我可以这样做:

ret.toStream().foreach((k, v) -> sendToKafkaTopic("output"));

但我想知道是否有更好/更优雅的方法来做到这一点?

But I wonder if there's a better / more elegant way of doing this?

推荐答案

您可以使用 KTable 的新功能 KTable.suppress 2.1 版

You can use new feature of KTable KTable.suppress in version 2.1

此方法允许您为窗口计算的每个窗口/键获得一个最终结果.

This method allows you get exactly one final result per window/key for windowed computations.

KIP-328

More about suppres in KIP-328

您可以像这样使用 suppress 更新您的实现:

You can update your implementation with suppress like this:

KTable<Windowed<Long>, Counter> ret = input.groupByKey()
        .windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
        .aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()))
        .suppress(untilWindowCloses(BufferConfig.unbounded()));

ret.toStream().to("output"); // now stream should flush events to the output topic only when the window closes

这篇关于如何仅在窗口完成时输出窗口聚合的结果?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆