与 KafkaStreams 的窗口结束外连接 [英] End-of-window outer join with KafkaStreams

查看:18
本文介绍了与 KafkaStreams 的窗口结束外连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Kafka 主题,我希望其中的消息具有两种不同的密钥类型:旧的和新的.即 "1-new", "1-old", "2-new", "2-old".键是唯一的,但有些可能会丢失.

I have a Kafka topic where I expect messages with two different key types: old and new. i.e. "1-new", "1-old", "2-new", "2-old". Keys are unique, but some might be missing.

现在使用 Kotlin 和 KafkaStreams API,我可以记录那些具有相同密钥 ID 的新消息和旧消息.

Now using Kotlin and KafkaStreams API I can log those messages with have same key id from new and old.

    val windows = JoinWindows.of(Duration.of(2, MINUTES).toMillis())

    val newStream = stream.filter({ key, _ -> isNew(key) })
            .map({key, value ->  KeyValue(key.replace(NEW_PREFIX, ""), value) })

    val oldStream = stream.filter({ key, _ -> isOld(key) })
            .map({key, value ->  KeyValue(key.replace(OLD_PREFIX, ""), value) })

    val joined = newStream.join(oldStream,
            { value1, value2 -> "$value1&$value2" }, windows)

    joined.foreach({ key, value ->
        log.info { "JOINED $key : $value" }
    })

现在我想知道由于某种原因在时间窗口中丢失的新/旧密钥.是否可以通过 KafkaStreams API 实现?

Now I want to know new/old keys which are missing in time window for some reason. Is it possible to achieve with KafkaStreams API?

在我的情况下,当收到密钥 "1-old" 并且 "1-new" 不是在 2 分钟内仅在这种情况下我想报告 id 1 为可疑.

In my case when key "1-old" is received and "1-new" is not within 2 minutes only in this case I want to report id 1 as suspicious.

推荐答案

DSL 可能无法满足您的需求.但是,您可以使用处理器 API.话虽如此,leftJoin 实际上可以用来做繁重的工作".因此,在 leftJoin 之后,您可以使用带有附加状态的 .transform(...) 来进一步清理"数据.

The DSL might not give you what you want. However, you can use Processor API. Having say this, the leftJoin can actually be used to do the "heavy lifting". Thus, after the leftJoin you can use .transform(...) with an attached state to "clean up" the data further.

对于您收到的每个 old&null 记录,将其放入商店.如果您稍后收到 old&new,您可以将其从商店中删除.此外,您注册一个标点符号,并且在每次标点符号调用时,您都会扫描商店中足够旧"的条目,以便您确定以后不会产生 old&new 连接结果.对于这些条目,您发出 old&null 并从它们中删除.

For each old&null record you receive, put it into the store. If you receive a later old&new you can remove it from the store. Furthermore, you register a punctuation and on each punctuation call, you scan the store for entries that are "old enough" so you are sure no later old&new join result will be produced. For those entries, you emit old&null and remove from them from the store.

作为替代方案,您也可以省略连接,并在带有状态的单个 transform() 中完成所有操作.为此,您需要 KStream#merge() 旧流和新流并在合并的流上调用 transform().

As an alternative, you can also omit the join, and do everything in a single transform() with state. For this, you would need to KStream#merge() old and new stream and call transform() on the merged stream.

注意:您还可以将扫描逻辑"放入转换中,并在每次处理记录时执行,而不是注册标点符号.

Note: instead of registering a punctuation, you can also put the "scan logic" into the transform and execute it each time you process a record.

这篇关于与 KafkaStreams 的窗口结束外连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆