如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果? [英] How to send final kafka-streams aggregation result of a time windowed KTable?

查看:29
本文介绍了如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想做的是:

  1. 使用数字主题(Long's)中的记录
  2. 聚合(计数)每 5 秒窗口的值
  3. 将 FINAL 聚合结果发送到另一个主题

我的代码如下所示:

KStream<String, Long> longs = builder.stream(
            Serdes.String(), Serdes.Long(), "longs");

// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts = 
            longs.countByKey(TimeWindows.of("longCounts", 5000L));

// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
          .to("long-counts");

看起来一切都按预期进行,但聚合被发送到每个传入记录的目标主题.我的问题是如何只发送每个窗口的最终聚合结果?

It looks like everything works as expected, but the aggregations are sent to the destination topic for each incoming record. My question is how can I send only the final aggregation result of each window?

推荐答案

在 Kafka Streams 中没有最终聚合"这样的东西.窗口始终保持打开状态,以处理在窗口结束时间过后到达的乱序记录.然而,窗户不会永远保留.一旦保留时间到期,它们就会被丢弃.没有关于何时丢弃窗口的特殊操作.

In Kafka Streams there is no such thing as a "final aggregation". Windows are kept open all the time to handle out-of-order records that arrive after the window end-time passed. However, windows are not kept forever. They get discarded once their retention time expires. There is no special action as to when a window gets discarded.

有关更多详细信息,请参阅 Confluent 文档:http://docs.confluent.io/current/streams/

See Confluent documentation for more details: http://docs.confluent.io/current/streams/

因此,对于聚合的每次更新,都会生成一条结果记录(因为 Kafka Streams 也会更新乱序记录的聚合结果).您的最终结果"将是最新的结果记录(在窗口被丢弃之前).根据您的用例,手动重复数据删除将是解决问题的一种方法(使用较低级别的 API,transform()process())

Thus, for each update to an aggregation, a result record is produced (because Kafka Streams also update the aggregation result on out-of-order records). Your "final result" would be the latest result record (before a window gets discarded). Depending on your use case, manual de-duplication would be a way to resolve the issue (using lower lever API, transform() or process())

这篇博文也可能有所帮助:https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html

This blog post might help, too: https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html

另一篇不使用标点符号解决此问题的博文:http://blog.inovatrend.com/2018/03/making-of-message-gateway-with-kafka.html

Another blog post addressing this issue without using punctuations: http://blog.inovatrend.com/2018/03/making-of-message-gateway-with-kafka.html

更新

使用 KIP-328,添加了一个 KTable#suppress() 操作符,它将允许以严格的方式抑制连续更新并为每个窗口发出单个结果记录;权衡是增加延迟.

With KIP-328, a KTable#suppress() operator is added, that will allow to suppress consecutive updates in a strict manner and to emit a single result record per window; the tradeoff is an increase latency.

这篇关于如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆