与KafkaStreams一起在窗口外进行外部联接 [英] End-of-window outer join with KafkaStreams

查看:94
本文介绍了与KafkaStreams一起在窗口外进行外部联接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Kafka主题,我希望消息具有两种不同的密钥类型:旧的和新的. 即"1-new""1-old""2-new""2-old".密钥是唯一的,但有些可能会丢失.

I have a Kafka topic where I expect messages with two different key types: old and new. i.e. "1-new", "1-old", "2-new", "2-old". Keys are unique, but some might be missing.

现在使用Kotlin和KafkaStreams API,我可以记录来自新旧消息的具有相同密钥ID的消息.

Now using Kotlin and KafkaStreams API I can log those messages with have same key id from new and old.

    val windows = JoinWindows.of(Duration.of(2, MINUTES).toMillis())

    val newStream = stream.filter({ key, _ -> isNew(key) })
            .map({key, value ->  KeyValue(key.replace(NEW_PREFIX, ""), value) })

    val oldStream = stream.filter({ key, _ -> isOld(key) })
            .map({key, value ->  KeyValue(key.replace(OLD_PREFIX, ""), value) })

    val joined = newStream.join(oldStream,
            { value1, value2 -> "$value1&$value2" }, windows)

    joined.foreach({ key, value ->
        log.info { "JOINED $key : $value" }
    })

现在我想知道由于某些原因时间窗口缺少的新/旧键. KafkaStreams API是否可以实现?

Now I want to know new/old keys which are missing in time window for some reason. Is it possible to achieve with KafkaStreams API?

在我的情况下,仅在这种情况下,当收到密钥"1-old""1-new"不在2分钟之内时,我想将ID 1报告为可疑.

In my case when key "1-old" is received and "1-new" is not within 2 minutes only in this case I want to report id 1 as suspicious.

推荐答案

DSL可能无法满足您的需求.但是,您可以使用Processor API.话虽如此,leftJoin实际上可以用于执行繁重的工作".因此,在leftJoin之后,可以使用带有附加状态的.transform(...)进一步清理"数据.

The DSL might not give you what you want. However, you can use Processor API. Having say this, the leftJoin can actually be used to do the "heavy lifting". Thus, after the leftJoin you can use .transform(...) with an attached state to "clean up" the data further.

对于收到的每个old&null记录,将其放入存储中.如果以后收到old&new,则可以将其从商店中删除.此外,您注册了一个标点符号,并且在每个标点符号调用上,您都在商店中扫描了足够旧"的条目,因此您确定以后不会产生old&new连接结果.对于这些条目,您发出old&null并将其从商店中删除.

For each old&null record you receive, put it into the store. If you receive a later old&new you can remove it from the store. Furthermore, you register a punctuation and on each punctuation call, you scan the store for entries that are "old enough" so you are sure no later old&new join result will be produced. For those entries, you emit old&null and remove from them from the store.

作为替代方案,您也可以省略连接,并使用状态在单个transform()中进行所有操作.为此,您需要KStream#merge()新旧流,并在合并的流上调用transform().

As an alternative, you can also omit the join, and do everything in a single transform() with state. For this, you would need to KStream#merge() old and new stream and call transform() on the merged stream.

注意:除了注册标点符号外,还可以将扫描逻辑"放入转换中,并在每次处理记录时执行.

Note: instead of registering a punctuation, you can also put the "scan logic" into the transform and execute it each time you process a record.

这篇关于与KafkaStreams一起在窗口外进行外部联接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆