仅在流上收到新事件时才抑制触发事件 [英] Suppress triggers events only when new events are received on the stream

查看:25
本文介绍了仅在流上收到新事件时才抑制触发事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 Kafka 流 2.2.1.

I am using Kafka streams 2.2.1.

我使用抑制来阻止事件直到窗口关闭.我正在使用事件时间语义.但是,只有在流上有新消息可用时才会触发触发消息.

I am using suppress to hold back events until a window closes. I am using event time semantics. However, the triggered messages are only triggered once a new message is available on the stream.

提取以下代码对问题进行采样:

The following code is extracted to sample the problem:

        KStream<UUID, String>[] branches = is
            .branch((key, msg) -> "a".equalsIgnoreCase(msg.split(",")[1]),
                    (key, msg) -> "b".equalsIgnoreCase(msg.split(",")[1]),
                    (key, value) -> true);

    KStream<UUID, String> sideA = branches[0];
    KStream<UUID, String> sideB = branches[1];

    KStream<Windowed<UUID>, String> sideASuppressed =
            sideA.groupByKey(
                    Grouped.with(new MyUUIDSerde(),
                    Serdes.String()))
            .windowedBy(TimeWindows.of(Duration.ofMinutes(31)).grace(Duration.ofMinutes(32)))
            .reduce((v1, v2) -> {
                return v1;
            })
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
            .toStream();

消息仅在新消息到达sideA"流时才从sideASuppressed"流式传输(到达sideB"的消息不会导致抑制器发出任何消息,即使窗口关闭时间已经过去很久了).尽管在生产环境中,由于容量很大,这个问题可能不会发生太多,但在很多情况下,必须不要等待进入sideA"流的新消息.

Messages are only streamed from 'sideASuppressed' when a new message gets to 'sideA' stream (messages arriving to 'sideB' will not cause the suppress to emit any messages out even if the window closure time has passed a long time ago). Although, in production the problem is likely not to occur much due to high volume, there are enough cases when it is essential not to wait for a new message that gets into 'sideA' stream.

提前致谢.

推荐答案

根据 Kafka 流文档:

According to Kafka streams documentation:

仅当所有输入主题的所有输入分区都有可用的新数据(具有更新的时间戳)时,Stream-time 才会提前.如果至少一个分区没有任何新数据可用,则流时间不会提前,因此如果指定了 PunctuationType.STREAM_TIME,则不会触发 punctuate().此行为与配置的时间戳提取器无关,即使用 WallclockTimestampExtractor 不会启用 punctuate() 的挂钟触发.

Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available. If at least one partition does not have any new data available, stream-time will not be advanced and thus punctuate() will not be triggered if PunctuationType.STREAM_TIME was specified. This behavior is independent of the configured timestamp extractor, i.e., using WallclockTimestampExtractor does not enable wall-clock triggering of punctuate().

我不确定为什么会这样,但是,它解释了为什么只有在它使用的队列中有可用消息时才会发出被抑制的消息.

I am not sure why this is the case, but, it explains why suppressed messages are only being emitted when messages are available in the queue it uses.

如果有人对为什么这样实施有答案,我将很乐意学习.这种行为导致我的实现发出消息只是为了让我的被抑制的消息及时发出,并导致代码的可读性大大降低.

If anyone has an answer regarding why the implementation is such, I will be happy to learn. This behavior causes my implementation to emit messages just to get my the suppressed message to emit in time and causes the code to be much less readable.

这篇关于仅在流上收到新事件时才抑制触发事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆