如何注册无状态处理器(似乎也需要 StateStore)? [英] How to register a stateless processor (that seems to require a StateStore as well)?

查看:24
本文介绍了如何注册无状态处理器(似乎也需要 StateStore)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在构建拓扑并希望使用 KStream.process() 将一些中间值写入数据库.这一步不会改变数据的性质,并且是完全无状态的.

添加处理器需要创建一个 ProcessorSupplier 和将此实例与状态存储的名称一起传递给 KStream.process() 函数.这是我不明白的.

如何添加StateStore 拓扑的对象,因为它需要 StateStoreSupplier?

未能添加上述 StateStore 会在应用程序启动时出现此错误:

<块引用>

线程main"中的异常 org.apache.kafka.streams.errors.TopologyBuilderException:无效的拓扑构建:尚未添加 StateStore my-state-store.

为什么处理器必须有状态存储?对于无状态且不维护状态的处理器来说,这似乎是可选的.

<块引用>

通过应用处理器处理此流中的所有元素,一次一个元素.

解决方案

这里有一个 关于如何使用状态存储的简单示例,取自 关于 Kafka Streams 的 Confluent Platform 文档.

第一步:定义StateStore/StateStoreSupplier:

StateStoreSupplier countStore = Stores.create("Counts").withKeys(Serdes.String()).withValues(Serdes.Long()).执着的().建造();

<块引用>

  1. 我没有看到将 StateStore 对象添加到我的拓扑的方法.不过,它也需要 StateStoreSupplier.

第 2 步:将状态存储添加到您的拓扑中.

选项 A - 使用处理器 API 时:

TopologyBuilder builder = new TopologyBuilder();//添加以 Kafka 主题source-topic"为输入的源处理器节点builder.addSource("源", "源主题").addProcessor("Process", () -> new WordCountProcessor(), "Source")//添加与 WordCountProcessor 处理器关联的 countStore.addStateStore(countStore, "进程").addSink("Sink", "sink-topic", "Process");

选项 B - 使用 Kafka Streams DSL 时:

在这里您需要调用 KStreamBuilder#addStateStore("name-of-your-store") 将状态存储添加到您的处理器拓扑中.然后,在调用诸如 KStream#process()KStream#transform() 之类的方法时,您还必须传入状态存储的名称——否则您的应用程序将运行时失败.

KStream#transform()为例:

KStreamBuilder builder = new KStreamBuilder();//添加将在 Transformer[Supplier] 中使用的 countStore//我们传递给下面的 `transform()`.builder.addStateStore(countStore);KStreaminput = builder.stream("源主题");KStream变形 =input.transform(/* 你的 TransformerSupplier */, countStore.name());

<块引用>

为什么处理器必须有状态存储?对于无状态且不维护状态的处理器来说,这似乎是可选的.

你是对的——如果你的处理器不维护状态,你就不需要状态存储.

使用 DSL 时,您只需调用 KStreamBuilder#addStateStore("name-of-your-store") 即可将状态存储添加到您的处理器拓扑中,并在以后引用它.

I'm building a topology and want to use KStream.process() to write some intermediate values to a database. This step doesn't change the nature of the data and is completely stateless.

Adding a Processor requires to create a ProcessorSupplier and pass this instance to the KStream.process() function along with the name of a state store. This is what I don't understand.

How to add a StateStore object to a topology since it requires a StateStoreSupplier?

Failing to add a said StateStore gives this error when the application is started:

Exception in thread "main" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: StateStore my-state-store is not added yet.

Why is it necessary for a processor to have a state store? It seems that this could well be optional for processors that are stateless and don't maintain state.

Process all elements in this stream, one element at a time, by applying a Processor.

解决方案

Here's a simple example on how to use state stores, taken from the Confluent Platform documentation on Kafka Streams.

Step 1: Defining the StateStore/StateStoreSupplier:

StateStoreSupplier countStore = Stores.create("Counts")
                                      .withKeys(Serdes.String())
                                      .withValues(Serdes.Long())
                                      .persistent()
                                      .build();

  1. I don't see a way to add a StateStore object to my topology. It requires a StateStoreSupplier as well though.

Step 2: Adding the state store to your topology.

Option A - When using the Processor API:

TopologyBuilder builder = new TopologyBuilder();

// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
       .addProcessor("Process", () -> new WordCountProcessor(), "Source")
       // Add the countStore associated with the WordCountProcessor processor
       .addStateStore(countStore, "Process")
       .addSink("Sink", "sink-topic", "Process");

Option B - When using the Kafka Streams DSL:

Here you need to call KStreamBuilder#addStateStore("name-of-your-store") to add the state store to your processor topology. Then, when calling methods such as KStream#process() or KStream#transform(), you must also pass in the name of the state store -- otherwise your application will fail at runtime.

At the example of KStream#transform():

KStreamBuilder builder = new KStreamBuilder();

// Add the countStore that will be used within the Transformer[Supplier]
// that we pass into `transform()` below.
builder.addStateStore(countStore);

KStream<byte[], String> input = builder.stream("source-topic");

KStream<String, Long> transformed =
    input.transform(/* your TransformerSupplier */, countStore.name());

Why is it necessary for a processor to have a state store? It seems that this could well be optional for processors that are stateless and don't maintain state.

You are right -- you don't need a state store if your processor does not maintain state.

When using the DSL, you need to simply call KStreamBuilder#addStateStore("name-of-your-store") to add the state store to your processor topology and reference it later on.

这篇关于如何注册无状态处理器(似乎也需要 StateStore)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆