带有 statestore 的 Kafka Stateful Stream 处理器:幕后 [英] Kafka Stateful Stream processor with statestore: Behind the scenes

查看:36
本文介绍了带有 statestore 的 Kafka Stateful Stream 处理器:幕后的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图理解Stateful 流处理器.

据我所知,在这种类型的流处理器中,它使用 State Store 维护某种状态.

As I understand in this type of stream-processor, it maintains some sort of state using State Store.

我了解到,实现 State Store 的方法之一是使用 RocksDB.假设以下 topology(并且只有一个处理器是 stateful)

I came to know, one of the ways to implement State Store is using RocksDB. Assuming the following topology (and only one processor being stateful)

A->B->C ;处理器 B 作为有状态的 local 状态存储和 changelog 启用.我使用的是低级 API.

A->B->C ; processor B as stateful with local state store and changelog enabled. I am using low level API.

假设 sp 监听单个 kafka 主题,比如 topic-1 有 10 个分区.

Assuming the sp listens on a single kafka topic, say topic-1 with 10 partitions.

我观察到,当应用程序启动时(不同物理机中的 2 个实例和 num.stream.threads = 5),然后为 state store 创建目录结构有如下内容:

I observed, that when the application is started (2 instances in different physical machines and num.stream.threads = 5), then for state store it creates directory structure which has something like below:

0_0 , 0_1, 0_2.... 0_9 (每台机器有 5 个所以总共 10 个分区).

0_0 , 0_1, 0_2.... 0_9 (Each machines has five so total 10 partitions).

我正在浏览一些在线资料,它说我们应该创建一个 StoreBuilder 并使用 addStateStore() 附加拓扑结构代替在处理器中创建状态存储.

I was going through some online material where it said we should create a StoreBuilder and attach it topology using addStateStore() instead of creating a state store within a processor.

喜欢:

topology.addStateStore(storeBuilder,"processorName")

Ref also: org.apache.kafka.streams.state.Store

我不明白将 storeBuilder 附加到拓扑与实际在处理器中创建 statestore 有什么区别.它们之间有什么区别?

I didn't understand what is the difference in attaching a storeBuilder to topology vs actually creating a statestore within processor. What is the differences between them?

第二部分:对于 statestore,它创建目录,如:0_0、0_1 等.谁以及如何创建它?kafka 主题(sp 正在侦听)和为 State Store 创建的目录数量之间是否存在某种 1:1 映射?

The second part: For statestore it creates directory like: 0_0, 0_1 etc. Who and how it gets created? Is there some sort of 1:1 mapping between the kafka topics (at which sp is listening) ande the number of directories that gets created for State Store?

推荐答案

我不明白将 storeBuilder 附加到拓扑与在处理器中实际创建 statestore 有什么区别.它们之间有什么区别?

I didn't understand what is the difference in attaching a storeBuilder to topology vs actually creating a statestore within processor. What is the differences between them?

为了让 Kafka Streams 为您管理存储(容错、迁移),Kafka Streams 需要了解存储.因此,您给 Kafka Streams 一个 StoreBuilder,然后 Kafka Streams 为您创建和管理商店.

In order to let Kafka Streams manage the store for you (fault-tolerance, migration), Kafka Streams needs to be aware of the store. Thus, you give Kafka Streams a StoreBuilder and Kafka Streams creates and manages the store for you.

如果你只是在你的处理器中创建一个存储,Kafka Streams 不知道这个存储并且存储不会容错.

If you just create a store inside your processor, Kafka Streams is not aware of the store and the store won't be fault-tolerant.

对于 statestore,它会创建目录,例如:0_0、0_1 等.谁以及如何创建它?kafka 主题(sp 正在侦听)和为 State Store 创建的目录数量之间是否存在某种 1:1 映射?

For statestore it creates directory like: 0_0, 0_1 etc. Who and how it gets created? Is there some sort of 1:1 mapping between the kafka topics (at which sp is listening) ande the number of directories that gets created for State Store?

是的,有映射.存储基于输入主题分区的数量共享.您还可以获得每个分区的任务",任务目录名称为 y_zy 是子拓扑编号,z 是分区数字.对于您的简单拓扑,您看到的所有目录都只有一个子拓扑,具有相同的 0_ 前缀.

Yes, there is a mapping. The store is shared base in the number of input topic partitions. You also get a "task" per partition and the task directories are name y_z with y being the sub-topology number and z being the partition number. For your simple topology you only have one sub-topology to all directories you see have the same 0_ prefix.

因此,您的逻辑存储有 10 个物理分片.当相应的输入主题分区分配给不同的实例时,此分片允许 Kafka Streams 迁移状态.总体而言,您最多可以运行 10 个实例,每个将处理一个分区,并托管您商店的一个分片.

Hence, you logical store has 10 physical shards. This sharding allows Kafka Streams to mirgrate state when the corresponding input topic partition is assigned to a different instance. Overall, you can run up to 10 instanced and each would process one partition, and host one shard of your store.

这篇关于带有 statestore 的 Kafka Stateful Stream 处理器:幕后的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆