窗口汇总事件计数 [英] A windowed aggregation on event count
本文介绍了窗口汇总事件计数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我将我的卡夫卡事件分组了:
I have grouped my kafka events:
private static void createImportStream(final StreamsBuilder builder, final Collection<String> topics) {
final KStream<byte[], GraphEvent> stream = builder.stream(topics, Consumed.with(Serdes.ByteArray(), new UserEventThriftSerde()));
stream.filter((key, request) -> {
return Objects.nonNull(request);
}).groupBy(
(key, value) -> Integer.valueOf(value.getSourceType()),
Grouped.with(Serdes.Integer(), new UserEventThriftSerde()))
.aggregate(ArrayList::new, (key, value, aggregatedValue) -> {
aggregatedValue.add(value);
return aggregatedValue;
},
Materialized.with(Serdes.Integer(), new ArrayListSerde<UserEvent>(new UserEventThriftSerde()))
).toStream();
}
如何添加window
,但不基于时间,而是基于事件数.
原因是事件将是批量转储,由于所有事件都可能在同一秒内出现,因此无法进行时间窗口聚合.
how can I add a window
but not based on time, but based on number of events.
The reason is that the events will be a bulk dump, a time windowed aggregation would not fit since all events could appear in the same few seconds.
推荐答案
Kafka Streams不支持现成的基于计数的窗口,因为它们是不确定的,并且很难处理乱序数据.
Kafka Streams does not support count-based windows out-of-the box because those are non-deterministic and it's hard to handle out-of-order data.
不过,您可以使用Processor API来为您的用例构建自定义运算符,而不是使用DSL.
Instead of using the DSL, you can use the Processor API to build a custom operator for your use case though.
这篇关于窗口汇总事件计数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文