如何注册无状态处理器(似乎也需要StateStore)? [英] How to register a stateless processor (that seems to require a StateStore as well)?
问题描述
I'm building a topology and want to use KStream.process() to write some intermediate values to a database. This step doesn't change the nature of the data and is completely stateless.
添加处理器需要创建一个 ProcessorSupplier 并将此实例与状态存储的名称一起传递给KStream.process()
函数.这是我不理解的.
Adding a Processor requires to create a ProcessorSupplier and pass this instance to the KStream.process()
function along with the name of a state store. This is what I don't understand.
如何添加 StateStore 拓扑对象,因为它需要 StateStoreSupplier ?
How to add a StateStore object to a topology since it requires a StateStoreSupplier?
在启动应用程序时未能添加上述StateStore
会出现此错误:
Failing to add a said StateStore
gives this error when the application is started:
线程"main"中的异常org.apache.kafka.streams.errors.TopologyBuilderException:无效的拓扑构建:StateStore我的状态存储尚未添加.
Exception in thread "main" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: StateStore my-state-store is not added yet.
为什么处理器必须具有状态存储?对于无状态且不维护状态的处理器,这似乎是可选的.
Why is it necessary for a processor to have a state store? It seems that this could well be optional for processors that are stateless and don't maintain state.
通过应用处理器,一次处理一个流中的所有元素.
Process all elements in this stream, one element at a time, by applying a Processor.
推荐答案
这是关于如何使用状态存储的简单示例,取自 步骤1:定义StateStore
/StateStoreSupplier
:
Step 1: Defining the StateStore
/StateStoreSupplier
:
StateStoreSupplier countStore = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build();
- 我看不到将StateStore对象添加到拓扑中的方法.不过,它也需要一个StateStoreSupplier.
步骤2:将状态存储添加到拓扑中.
选项A-使用处理器API时:
Option A - When using the Processor API:
TopologyBuilder builder = new TopologyBuilder();
// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
.addProcessor("Process", () -> new WordCountProcessor(), "Source")
// Add the countStore associated with the WordCountProcessor processor
.addStateStore(countStore, "Process")
.addSink("Sink", "sink-topic", "Process");
选项B-使用Kafka Streams DSL时:
Option B - When using the Kafka Streams DSL:
在这里,您需要调用KStreamBuilder#addStateStore("name-of-your-store")
来将状态存储添加到处理器拓扑中.然后,当调用诸如KStream#process()
或KStream#transform()
之类的方法时,您还必须传递状态存储的名称-否则您的应用程序将在运行时失败.
Here you need to call KStreamBuilder#addStateStore("name-of-your-store")
to add the state store to your processor topology. Then, when calling methods such as KStream#process()
or KStream#transform()
, you must also pass in the name of the state store -- otherwise your application will fail at runtime.
以KStream#transform()
为例:
KStreamBuilder builder = new KStreamBuilder();
// Add the countStore that will be used within the Transformer[Supplier]
// that we pass into `transform()` below.
builder.addStateStore(countStore);
KStream<byte[], String> input = builder.stream("source-topic");
KStream<String, Long> transformed =
input.transform(/* your TransformerSupplier */, countStore.name());
为什么处理器必须具有状态存储?对于无状态且不维护状态的处理器,这似乎是可选的.
Why is it necessary for a processor to have a state store? It seems that this could well be optional for processors that are stateless and don't maintain state.
您是对的-如果处理器不维护状态,则不需要状态存储.
You are right -- you don't need a state store if your processor does not maintain state.
使用DSL时,只需调用KStreamBuilder#addStateStore("name-of-your-store")
即可将状态存储添加到处理器拓扑中,并在以后引用它.
When using the DSL, you need to simply call KStreamBuilder#addStateStore("name-of-your-store")
to add the state store to your processor topology and reference it later on.
这篇关于如何注册无状态处理器(似乎也需要StateStore)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!