带有抑制的 Kafka SessionWindow 仅在有稳定的输入记录流时发送最终事件 [英] Kafka SessionWindow with suppress only sends final event when there is a steady stream of input records

查看:16
本文介绍了带有抑制的 Kafka SessionWindow 仅在有稳定的输入记录流时发送最终事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果没有恒定的输入记录流,则看起来好像带有宽限期和抑制的会话窗口的 Kafka Stream 无法输出最终事件.

It appears as though a Kafka Stream with a session window with a grace period and suppression fails to output a final event if there is no constant stream of input records.

上下文:我们正在使用变更数据捕获 (CDC) 来监控对旧数据库的更改.当用户使用 UI 进行更改时,数据库事务将更改 1..n 个表.每个 SQL 语句都会生成一条 Kafka 记录.这些需要聚合以创建一个触发记录",用于启动一个昂贵的过程.该过程应在提交遗留数据库中事务的一秒钟内启动.只有少数用户使用旧应用程序,因此交易之间可能会有大量时间.

Context: We are using change data capture (CDC) to monitor changes to a legacy database. When a user makes changes using the UI, a database transaction will change 1..n tables. Each SQL statement results in a Kafka record. These need to be aggregated in order to create one "trigger record" which is used to start an expensive process. The process should be started within a second of the transaction in the legacy database being committed. There are only a handful of users working with the old application, and so there can be significant amount of time between transactions.

我们有一个 Kafka Stream 应用程序,它使用会话窗口和 400 毫秒的不活动间隔来聚合共享相同密钥(事务 ID)的传入记录,并输出触发记录.

We have a Kafka Stream application which uses a session window and an inactivity gap of 400ms in order to aggregate the incoming records that share the same key (the transaction ID), and output the trigger record.

我们有一个可行的解决方案,但只要其他事务正在运行,触发器记录只会写入输出主题,以便生成稳定的传入记录流.我们需要关闭窗口并写入触发记录,即使没有进一步的输入记录.

We have a working solution but the trigger record is only written to the output topic so long as other transactions are running in order to generate a steady stream of incoming records. We need the window to close and the trigger record to be written, even if there are no further input records.

工作代码在这里:https://github.com/maxant/kafka-data-consistency/blob/714a44689fd48aa28b05b855638ac7ed50dd5ee9/partners/src/main/java/ch/maxant/ThroughputTestners.#L65

以下是该代码的摘要:

      stream.groupByKey()
            .windowedBy(SessionWindows.with(Duration.ofMillis(400)).grace(Duration.ofMillis(0)))
            .aggregate(...)
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
            .toStream((k,v) -> k.key())
            .to("throughput-test-aggregated");

最初我没有压制,也没有宽限期.仅使用默认配置,我总是收到包含所有聚合记录的窗口的最终事件,但在 400 毫秒窗口之后最多需要 6 秒,这对我们来说太长了.

Initially I had no suppression and no grace period. Using just the default configuration, I always received a final event for the window containing all the aggregated records, but it was taking up to 6 seconds after the 400ms window, which is too long for us to wait.

为了减少延迟并加快速度,我将 CACHE_MAX_BYTES_BUFFERING_CONFIG 设置为 1,但这会在每次聚合后产生一条输出记录,而不仅仅是一条输出记录.

In order to reduce the latency and speed things up, I set CACHE_MAX_BYTES_BUFFERING_CONFIG to 1, but that caused an output record after each aggregation, rather than just a single output record.

我引入了抑制(以及它的 0ms 宽限期),以确保只创建一个输出记录.

I introduced the suppression (and with it, a grace period of 0ms), in order to ensure that only one output record is created.

现在的问题是我只收到一个输出记录,如果新的输入记录在窗口关闭后到达(不管它们的键).

该测试创建了 10 个输入记录,所有输入记录都具有相同的键,相隔 10 毫秒,均在 100 毫秒内.然后它会休息 3 秒钟,让我在一组 10 个记录后将其关闭.我希望收到一个输出记录,但没有到达,除非我让测试运行,以创建第二组输入记录.这个问题是可以重现的.

The test creates 10 input records all with the same key, 10ms apart, all within 100ms. It then rests for 3 seconds, allowing me to shut it off after one group of ten records. I expect to receive one output record, but none arrives, unless I leave the test running, to create a second group of input records. This problem is reproducible.

我已阅读以下文章,但找不到任何描述我所看到内容的内容,即最终记录仅在处理其他记录(无论键值如何)后才会发送到输出主题.

I have read the following articles but cannot find anything which describes what I am seeing, namely that the final record is only sent to the output topic once additional records (regardless of key) are processed.

我需要更改什么才能将最终记录发送到我的输出主题,即使没有进一步处理记录?

(在 Linux 上使用 Kafka 2.4.1 客户端和服务器)

(Using Kafka 2.4.1 with client and server on Linux)

推荐答案

更新 : 我有拓扑错误,已修复

Update : I have an error in topology, fixed

我在使用抑制时遇到了与您完全相同的问题,这是预期的行为.因为抑制仅支持使用流时间而不是挂钟时间发出缓冲记录,如果您停止获取新记录,流时间将被冻结并且 Suppress 不会发出最后一个被抑制的窗口.

I had the same exactly same problems as you when using suppress, and it's expected behavior. Because suppress only support emit buffered records using stream time not wall-clock time, if you stop getting new records, stream time will be freeze and Suppress will not emit the last suppressed window.

我使用的解决方案是使用 Processor API 编写自定义抑制(使用 Transfomer,以便您可以使用 DSL 向下游发送抑制记录),并将状态存储用作缓冲区,然后检查应该刷新哪些窗口(或发出) 到下游处理器,每当有新记录进入或经过时间间隔后(使用 WALL_CLOCK_TIME 标点符号).

The solution I used is to write a custom suppress using Processor API (use a Transfomer so you can use DSL to send supprssed record downstream) with a state store used as a buffer, then check what windows should be flush (or emit) to downstream processor whenever there is a new record come in or after a time interval has passed (using a WALL_CLOCK_TIME punctuate).

变压器看起来像这样:

public class SuppressWindowTransformer implements Transformer<Windowed<String>, String, KeyValue<Windowed<String>, String>> {
    private ProcessorContext context;
    private Cancellable cancellable;
    private KeyValueStore<Windowed<String>, String> kvStore;
    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        kvStore = (KeyValueStore) context.getStateStore("suppressed_windowed_transaction");
        cancellable = context.schedule(Duration.ofMillis(100), PunctuationType.WALL_CLOCK_TIME, timestamp -> flushOldWindow());
    }

    @Override
    public KeyValue<Windowed<String>, String> transform(Windowed<String> key, String value) {
        kvStore.put(key, value);//buffer (or suppress) the new in coming window
        flushOldWindow();
        return null;
    }

    private void flushOldWindow() {
        //your logic to check for old windows in kvStore then flush

        //forward (or unsuppressed) your suppressed records downstream using ProcessorContext.forward(key, value)
    }

    @Override
    public void close() {
        cancellable.cancel();//cancel punctuate
    }
}

在您的 Stream DSL 中:

And in your Stream DSL:

stream.groupByKey()
            .windowedBy(SessionWindows.with(Duration.ofMillis(400)).grace(Duration.ofMillis(0)))
            .aggregate(...)//remove suppress operator and write custom suppress using processor API
            .toStream()
            .transform(SuppressWindowTransformer::new, "suppressed_windowed_transaction")
            .to("throughput-test-aggregated");

这篇关于带有抑制的 Kafka SessionWindow 仅在有稳定的输入记录流时发送最终事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆