带抑制的Kafka SessionWindow仅在有稳定的输入记录流时才发送最终事件 [英] Kafka SessionWindow with suppress only sends final event when there is a steady stream of input records

查看:140
本文介绍了带抑制的Kafka SessionWindow仅在有稳定的输入记录流时才发送最终事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

看起来好像一个带有带有宽限期的会话窗口的卡夫卡流,如果没有恒定的输入记录流,抑制将无法输出最终事件.

It appears as though a Kafka Stream with a session window with a grace period and suppression fails to output a final event if there is no constant stream of input records.

上下文:我们正在使用更改数据捕获(CDC)来监视对旧数据库的更改.当用户使用UI进行更改时,数据库事务将更改1..n表.每个SQL语句都会产生一个Kafka记录.为了创建一个触发记录",需要使用这些记录来启动昂贵的过程,因此需要对这些记录进行汇总.该过程应在提交的遗留数据库中的事务处理的一秒钟内开始.仅有少数用户在使用旧应用程序,因此事务之间的时间可能很长.

Context: We are using change data capture (CDC) to monitor changes to a legacy database. When a user makes changes using the UI, a database transaction will change 1..n tables. Each SQL statement results in a Kafka record. These need to be aggregated in order to create one "trigger record" which is used to start an expensive process. The process should be started within a second of the transaction in the legacy database being committed. There are only a handful of users working with the old application, and so there can be significant amount of time between transactions.

我们有一个Kafka Stream应用程序,该应用程序使用会话窗口和400ms的不活动间隔,以便汇总共享相同密钥(事务ID)的传入记录,并输出触发记录.

We have a Kafka Stream application which uses a session window and an inactivity gap of 400ms in order to aggregate the incoming records that share the same key (the transaction ID), and output the trigger record.

我们有一个可行的解决方案,但是只有在运行其他事务以生成稳定的传入记录流的情况下,触发器记录才会写入输出主题. 即使没有其他输入记录,我们也需要关闭窗口并写入触发记录.

We have a working solution but the trigger record is only written to the output topic so long as other transactions are running in order to generate a steady stream of incoming records. We need the window to close and the trigger record to be written, even if there are no further input records.

工作代码在这里: https://github.com/maxant/kafka-data-consistency/blob/714a44689fd48aa28b05b855638ac7ed50dd5ee9/partners/src/main/java/ch/maxant/kdc/partners/ThroughputTestStream. java#L65

以下是该代码的摘要:

      stream.groupByKey()
            .windowedBy(SessionWindows.with(Duration.ofMillis(400)).grace(Duration.ofMillis(0)))
            .aggregate(...)
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
            .toStream((k,v) -> k.key())
            .to("throughput-test-aggregated");

最初,我没有压抑,也没有宽限期. 仅使用默认配置,我总是收到包含所有聚合记录的窗口的最终事件,但是它在400ms窗口后最多需要6秒钟,这对于我们来说太长了.

Initially I had no suppression and no grace period. Using just the default configuration, I always received a final event for the window containing all the aggregated records, but it was taking up to 6 seconds after the 400ms window, which is too long for us to wait.

为了减少延迟并加快处理速度,我将CACHE_MAX_BYTES_BUFFERING_CONFIG设置为1,但这在每次聚合后都会导致输出记录,而不仅仅是一条输出记录.

In order to reduce the latency and speed things up, I set CACHE_MAX_BYTES_BUFFERING_CONFIG to 1, but that caused an output record after each aggregation, rather than just a single output record.

我引入了抑制(并带有一个0ms的宽限期),以确保仅创建一个输出记录.

I introduced the suppression (and with it, a grace period of 0ms), in order to ensure that only one output record is created.

现在的问题是,如果新的输入记录在窗口关闭后到达(无论它们的键如何),我只会收到一条输出记录.

该测试将创建10条输入记录,所有记录均使用相同的键,相隔10ms,且均在100ms内.然后它休息3秒钟,让我在一组10条记录后将其关闭.我希望收到一条输出记录,但是除非我保持测试运行,否则什么都不会到达,以创建第二组输入记录.这个问题是可以重现的.

The test creates 10 input records all with the same key, 10ms apart, all within 100ms. It then rests for 3 seconds, allowing me to shut it off after one group of ten records. I expect to receive one output record, but none arrives, unless I leave the test running, to create a second group of input records. This problem is reproducible.

我已阅读以下文章,但找不到任何描述我所见内容的内容,即,最终记录仅在处理了其他记录(无论键如何)之后才发送到输出主题.

I have read the following articles but cannot find anything which describes what I am seeing, namely that the final record is only sent to the output topic once additional records (regardless of key) are processed.

  • https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/
  • https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
  • https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html

我需要更改什么,以便即使没有其他记录被处理,最终记录也可以发送到我的输出主题?

(在Linux上将Kafka 2.4.1与客户端和服务器一起使用)

(Using Kafka 2.4.1 with client and server on Linux)

推荐答案

更新:我的拓扑错误,已修复

Update : I have an error in topology, fixed

使用抑制时,我遇到了与您完全相同的问题,这是预期的行为.因为抑制只支持使用流时间而不是时钟来发射缓冲的记录,所以如果停止获取新记录,则流时间将被冻结,Suppress将不会发射最后一个抑制的窗口.

I had the same exactly same problems as you when using suppress, and it's expected behavior. Because suppress only support emit buffered records using stream time not wall-clock time, if you stop getting new records, stream time will be freeze and Suppress will not emit the last suppressed window.

我使用的解决方案是使用处理器API编写一个自定义抑制(使用Transfomer,以便您可以使用DSL向下游发送已暂停的记录),并将状态存储用作缓冲区,然后检查应刷新(或发出)哪些窗口),只要有新记录进入或经过一个时间间隔(使用WALL_CLOCK_TIME标点符号),就将它们发送给下游处理器.

The solution I used is to write a custom suppress using Processor API (use a Transfomer so you can use DSL to send supprssed record downstream) with a state store used as a buffer, then check what windows should be flush (or emit) to downstream processor whenever there is a new record come in or after a time interval has passed (using a WALL_CLOCK_TIME punctuate).

transfomer看起来像这样:

The transfomer would look like this:

public class SuppressWindowTransformer implements Transformer<Windowed<String>, String, KeyValue<Windowed<String>, String>> {
    private ProcessorContext context;
    private Cancellable cancellable;
    private KeyValueStore<Windowed<String>, String> kvStore;
    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        kvStore = (KeyValueStore) context.getStateStore("suppressed_windowed_transaction");
        cancellable = context.schedule(Duration.ofMillis(100), PunctuationType.WALL_CLOCK_TIME, timestamp -> flushOldWindow());
    }

    @Override
    public KeyValue<Windowed<String>, String> transform(Windowed<String> key, String value) {
        kvStore.put(key, value);//buffer (or suppress) the new in coming window
        flushOldWindow();
        return null;
    }

    private void flushOldWindow() {
        //your logic to check for old windows in kvStore then flush

        //forward (or unsuppressed) your suppressed records downstream using ProcessorContext.forward(key, value)
    }

    @Override
    public void close() {
        cancellable.cancel();//cancel punctuate
    }
}

在您的Stream DSL中:

And in your Stream DSL:

stream.groupByKey()
            .windowedBy(SessionWindows.with(Duration.ofMillis(400)).grace(Duration.ofMillis(0)))
            .aggregate(...)//remove suppress operator and write custom suppress using processor API
            .toStream()
            .transform(SuppressWindowTransformer::new, "suppressed_windowed_transaction")
            .to("throughput-test-aggregated");

这篇关于带抑制的Kafka SessionWindow仅在有稳定的输入记录流时才发送最终事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆