仅在窗口完成时如何输出窗口聚合的结果? [英] How to output result of windowed aggregation only when window is finished?

查看:77
本文介绍了仅在窗口完成时如何输出窗口聚合的结果?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个KStream,我想在其中计算事件的某个维度.我这样做如下:

I have a KStream in which I want to count some dimension of the events. I do it as follows:

KTable<Windowed<Long>, Counter> ret = input.groupByKey()
  .windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
  .aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()));

我想要一个新的KStream,并将这些聚合作为事件.我可以很容易地做到这一点:

I want to have a new KStream with those aggregations as events. I can do it easily like this:

ret.toStream().to("output");

问题在于输入"主题中的每个事件都会产生一个输出"主题事件.我只想在窗口结束时才将事件发布到输出主题.例如,如果窗口为一分钟,则每分钟每键发送一个事件.

The problem is that every event in "input" topic will produce an event to "output" topic. I would like to publish an event to the output topic only when a window is finished. For example if the window is of one minute, send a single event per key per minute.

我想我可以这样做:

ret.toStream().foreach((k, v) -> sendToKafkaTopic("output"));

但是我想知道是否有更好/更优雅的方式来做到这一点?

But I wonder if there's a better / more elegant way of doing this?

推荐答案

您可以使用KTable的新功能

You can use new feature of KTable KTable.suppress in version 2.1

此方法允许您为窗口计算的每个窗口/键获得一个最终结果.

This method allows you get exactly one final result per window/key for windowed computations.

suppres的更多信息" rel = "nofollow noreferrer"> KIP-328

More about suppres in KIP-328

您可以使用suppress更新实现,如下所示:

You can update your implementation with suppress like this:

KTable<Windowed<Long>, Counter> ret = input.groupByKey()
        .windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
        .aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()))
        .suppress(untilWindowCloses(BufferConfig.unbounded()));

ret.toStream().to("output"); // now stream should flush events to the output topic only when the window closes

这篇关于仅在窗口完成时如何输出窗口聚合的结果?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆