仅当流上接收到新事件时,才触发事件 [英] Suppress triggers events only when new events are received on the stream

查看:111
本文介绍了仅当流上接收到新事件时,才触发事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Kafka流2.2.1.

I am using Kafka streams 2.2.1.

我正在使用抑制来保留事件,直到窗口关闭.我正在使用事件时间语义. 但是,只有在流中有新消息可用时,才会触发触发的消息.

I am using suppress to hold back events until a window closes. I am using event time semantics. However, the triggered messages are only triggered once a new message is available on the stream.

提取以下代码以对问题进行采样:

The following code is extracted to sample the problem:

        KStream<UUID, String>[] branches = is
            .branch((key, msg) -> "a".equalsIgnoreCase(msg.split(",")[1]),
                    (key, msg) -> "b".equalsIgnoreCase(msg.split(",")[1]),
                    (key, value) -> true);

    KStream<UUID, String> sideA = branches[0];
    KStream<UUID, String> sideB = branches[1];

    KStream<Windowed<UUID>, String> sideASuppressed =
            sideA.groupByKey(
                    Grouped.with(new MyUUIDSerde(),
                    Serdes.String()))
            .windowedBy(TimeWindows.of(Duration.ofMinutes(31)).grace(Duration.ofMinutes(32)))
            .reduce((v1, v2) -> {
                return v1;
            })
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
            .toStream();

仅当新消息进入"sideA"流时,消息才从"sideASuppressed"流传输(到达窗口"sideB"的消息不会导致抑制发出任何消息,即使关闭窗口的时间已经很久了) ). 尽管在生产中,由于数量大,问题不太可能发生,但在很多情况下,至关重要的是不要等待进入"sideA"流的新消息.

Messages are only streamed from 'sideASuppressed' when a new message gets to 'sideA' stream (messages arriving to 'sideB' will not cause the suppress to emit any messages out even if the window closure time has passed a long time ago). Although, in production the problem is likely not to occur much due to high volume, there are enough cases when it is essential not to wait for a new message that gets into 'sideA' stream.

谢谢.

推荐答案

根据Kafka流文档:

According to Kafka streams documentation:

仅当所有输入主题上的所有输入分区都具有可用的新数据(带有新的时间戳)时,流时间才是高级的.如果至少一个分区没有任何新数据可用,则流时间将不会提前,因此,如果指定了PunctuationType.STREAM_TIME,则不会触发punctuate().此行为与配置的时间戳提取程序无关,即,使用WallclockTimestampExtractor不会启用wallate触发punate().

Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available. If at least one partition does not have any new data available, stream-time will not be advanced and thus punctuate() will not be triggered if PunctuationType.STREAM_TIME was specified. This behavior is independent of the configured timestamp extractor, i.e., using WallclockTimestampExtractor does not enable wall-clock triggering of punctuate().

我不确定为什么会这样,但是,这解释了为什么仅在其使用的队列中有可用消息时才发出抑制消息.

I am not sure why this is the case, but, it explains why suppressed messages are only being emitted when messages are available in the queue it uses.

如果有人对为什么这样的实现有答案,我将很高兴学习.此行为导致我的实现发出消息只是为了使被抑制的消息及时发出,并使代码的可读性大大降低.

If anyone has an answer regarding why the implementation is such, I will be happy to learn. This behavior causes my implementation to emit messages just to get my the suppressed message to emit in time and causes the code to be much less readable.

这篇关于仅当流上接收到新事件时,才触发事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆