Kafka流中的聚集和状态存储保留 [英] Aggregration and state store retention in kafka streams

查看:38
本文介绍了Kafka流中的聚集和状态存储保留的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个如下的用例.对于每个即将到来的事件,我想看看 某个字段以查看其状态是否从A更改为B,如果是,则将其发送给 输出主题.流程是这样的:带有键"xyz"的事件以状态A进入,一段时间后 另一个事件是状态为B的键"xyz".我使用高级DSL编写了此代码.

I have a use case like the following. For each incoming event, I want to look at a certain field to see if it's status changed from A to B and if so, send that to an output topic. The flow is like this: An event with key "xyz" comes in with status A, and some time later another event comes in with key "xyz" with status B. I have this code using the high level DSL.

final KStream<String, DomainEvent> inputStream....

final KStream<String, DomainEvent> outputStream = inputStream
          .map((k, v) -> new KeyValue<>(v.getId(), v))
                    .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
                    .aggregate(DomainStatusMonitor::new,
                            (k, v, aggregate) -> {
                                aggregate.updateStatusMonitor(v);
                                return aggregate;
                            }, Materialized.with(Serdes.String(), jsonSerde))
                    .toStream()
                    .filter((k, v) -> v.isStatusChangedFromAtoB())
                    .map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));

是否有使用DSL编写此逻辑的更好方法?

Is there a better way to write this logic using the DSL?

与上面的代码中的聚合所创建的状态存储有关的问题.

Couple of questions regarding the state store created by the aggregation in the code above.

  1. 默认情况下是否正在创建内存状态存储?
  2. 如果我拥有无数个唯一的传入密钥,将会发生什么? 如果默认情况下使用的是内存存储,是否不需要切换到持久性存储? 我们如何处理DSL中的这种情况?
  3. 如果状态存储很大(内存中或持久性),它将如何影响 启动时间?如何使流处理等待,以使商店得到完全初始化? 还是Kafka Streams会确保在处理任何传入事件之前完全初始化状态存储?
  1. Is it creating an in-memory state store by default?
  2. What will happen if I have an unbounded number of unique incoming keys? If it is using an in-memory store by default, don't I need to switch to a persistent store? How do we handle situations like that in the DSL?
  3. If the state store is very large (either in-memory or persistent), how does it affect the startup time? How can I make the stream processing to wait so that the store gets fully initialized? Or will Kafka Streams ensure that the state store is fully initialized before any incoming events are processed?

提前谢谢!

推荐答案

  1. 默认情况下,将使用永久性的RocksDB存储.如果要使用内存存储,请传入Materialized.as(Stores.inMemoryKeyValueStore(...))

如果您拥有无限数量的唯一键,则最终将耗尽主内存或磁盘,并且应用程序将死亡.根据您的语义,可以通过使用带有大"gap"参数的会话窗口聚合来获取"TTL",而不是使旧密钥过期.

If you have an infinite number of unique keys, you will eventually run out of main-memory or disk and your application will die. Depending on your semantics, you can get a "TTL" by using a session windowed aggregation with a large "gap" parameter instead to expire old keys.

在处理新数据之前,将始终恢复状态.如果您使用内存中存储,则将通过使用基础的changelog主题来实现.根据状态的大小,这可能需要一段时间.如果您使用持久性RocksDB存储,则状态将从磁盘加载,因此无需还原,处理应立即进行.只有在这种情况下,才松开本地磁盘上的状态,才会从changelog主题进行还原.

The state will always be restored before processing new data happens. If you use in-memory store, this will happen by consuming the underlying changelog topic. Depending on the size of your state, this can take a while. If you use persistent RocksDB store, the state will be loaded from disk and thus no restore will be required and processing should happen immediately. Only if you loose the state on local disk, a restore from the changelog topic will happen for this case.

这篇关于Kafka流中的聚集和状态存储保留的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆