kafka 流中的聚合和状态存储保留 [英] Aggregration and state store retention in kafka streams

查看:28
本文介绍了kafka 流中的聚合和状态存储保留的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个像下面这样的用例.对于每个传入的事件,我想看看某个字段以查看它的状态是否从 A 变为 B,如果是,则将其发送到输出主题.流程是这样的:一个键为xyz"的事件以状态 A 进入,一段时间后另一个事件带有状态 B 的键xyz".我使用高级 DSL 编写了此代码.

I have a use case like the following. For each incoming event, I want to look at a certain field to see if it's status changed from A to B and if so, send that to an output topic. The flow is like this: An event with key "xyz" comes in with status A, and some time later another event comes in with key "xyz" with status B. I have this code using the high level DSL.

final KStream<String, DomainEvent> inputStream....

final KStream<String, DomainEvent> outputStream = inputStream
          .map((k, v) -> new KeyValue<>(v.getId(), v))
                    .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
                    .aggregate(DomainStatusMonitor::new,
                            (k, v, aggregate) -> {
                                aggregate.updateStatusMonitor(v);
                                return aggregate;
                            }, Materialized.with(Serdes.String(), jsonSerde))
                    .toStream()
                    .filter((k, v) -> v.isStatusChangedFromAtoB())
                    .map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));

是否有更好的方法使用 DSL 编写此逻辑?

Is there a better way to write this logic using the DSL?

关于上面代码中的聚合创建的状态存储的几个问题.

Couple of questions regarding the state store created by the aggregation in the code above.

  1. 是否默认创建内存状态存储?
  2. 如果我有无限数量的唯一传入密钥,会发生什么?如果它默认使用内存存储,我不需要切换到持久存储吗?我们如何处理 DSL 中的此类情况?
  3. 如果状态存储非常大(内存或持久),它会如何影响启动时间?如何让流处理等待以便商店完全初始化?或者 Kafka Streams 是否会确保在处理任何传入事件之前完全初始化状态存储?

提前致谢!

推荐答案

  1. 默认情况下,将使用持久性 RocksDB 存储.如果你想使用内存存储,你需要传入 Materialized.as(Stores.inMemoryKeyValueStore(...))

如果你有无数个唯一键,你最终会耗尽主内存或磁盘,你的应用程序就会死掉.根据您的语义,您可以通过使用带有大间隙"参数的会话窗口聚合而不是使旧密钥过期来获得TTL".

If you have an infinite number of unique keys, you will eventually run out of main-memory or disk and your application will die. Depending on your semantics, you can get a "TTL" by using a session windowed aggregation with a large "gap" parameter instead to expire old keys.

在处理新数据之前总是会恢复状态.如果您使用内存存储,这将通过使用底层更改日志主题来实现.根据您所在州的规模,这可能需要一段时间.如果你使用持久的 RocksDB 存储,状态将从磁盘加载,因此不需要恢复,处理应该立即发生.只有当您丢失本地磁盘上的状态时,才会在这种情况下从更改日志主题中恢复.

The state will always be restored before processing new data happens. If you use in-memory store, this will happen by consuming the underlying changelog topic. Depending on the size of your state, this can take a while. If you use persistent RocksDB store, the state will be loaded from disk and thus no restore will be required and processing should happen immediately. Only if you loose the state on local disk, a restore from the changelog topic will happen for this case.

这篇关于kafka 流中的聚合和状态存储保留的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆