事件计数的窗口聚合 [英] A windowed aggregation on event count

查看:36
本文介绍了事件计数的窗口聚合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经对我的 kafka 事件进行了分组:

I have grouped my kafka events:

    private static void createImportStream(final StreamsBuilder builder, final Collection<String> topics) {
        final KStream<byte[], GraphEvent> stream = builder.stream(topics, Consumed.with(Serdes.ByteArray(), new UserEventThriftSerde()));
        stream.filter((key, request) -> {
            return Objects.nonNull(request);
        }).groupBy(
                (key, value) -> Integer.valueOf(value.getSourceType()),
                Grouped.with(Serdes.Integer(), new UserEventThriftSerde()))
              .aggregate(ArrayList::new, (key, value, aggregatedValue) -> {
                          aggregatedValue.add(value);
                          return aggregatedValue;
                      },
                      Materialized.with(Serdes.Integer(), new ArrayListSerde<UserEvent>(new UserEventThriftSerde()))
              ).toStream();
    }

如何添加 window 但不是基于时间,而是基于事件数量.原因是事件将是批量转储,时间窗口聚合不适合,因为所有事件可能在相同的几秒钟内出现.

how can I add a window but not based on time, but based on number of events. The reason is that the events will be a bulk dump, a time windowed aggregation would not fit since all events could appear in the same few seconds.

推荐答案

Kafka Streams 不支持开箱即用的基于计数的窗口,因为它们是不确定的,并且很难处理乱序数据.

Kafka Streams does not support count-based windows out-of-the box because those are non-deterministic and it's hard to handle out-of-order data.

不过,您可以使用处理器 API 为您的用例构建自定义运算符,而不是使用 DSL.

Instead of using the DSL, you can use the Processor API to build a custom operator for your use case though.

这篇关于事件计数的窗口聚合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆