如何使用相同的 APPLICATION_ID_CONFIG 运行两个或多个拓扑? [英] How to run two or more topologies with the same APPLICATION_ID_CONFIG?
问题描述
我想在同一个实例上运行 2 个拓扑.1个拓扑涉及状态存储,其他涉及全局存储.我如何成功地做到这一点?
I want to run 2 topologies on same instance. 1 topology involves state store and other involves global store. How do I do this succesfully?
我创建了 1 个包含 3 个分区的主题,然后在 1 个拓扑中添加了一个状态存储,在第二个拓扑中添加了一个全局存储.
I have created 1 topic with 3 partitions and then added a state store in 1 topology and global store in 2nd topology.
拓扑 1:
public void createTopology() {
Topology topology = new Topology();
topology.addSource("source", new KeyDeserializer(), new ValueDeserializer(), "topic1");
topology.addProcessor("processor1", new CustomProcessorSupplier1(), "source");
final KeyValueStoreBuilder<Bytes, byte[]> rStoreBuilder = new KeyValueStoreBuilder<>(new RocksDbKeyValueBytesStoreSupplier("rstore"), Serdes.Bytes(), Serdes.ByteArray(), Time.SYSTEM);
rStoreBuilder.withLoggingEnabled(new HashMap<>());
topology.addStateStore(rStoreBuilder, "processor1");
Properties p = new Properties();
p.put(APPLICATION_ID_CONFIG, "stream1");
p.put(BOOTSTRAP_SERVERS_CONFIG, KafkaUtil.getBootStrapServers());
p.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, KeySerde.class);
p.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, ValueSerde.class);
streams = new KafkaStreams(topology, p);
streams.start();
}
拓扑 2:
public void createTopology() {
Topology topology = new Topology();
final KeyValueStoreBuilder<Bytes, byte[]> rStoreBuilder = new KeyValueStoreBuilder<>(new RocksDbKeyValueBytesStoreSupplier("rstoreg"), Serdes.Bytes(), Serdes.ByteArray(), Time.SYSTEM);
rStoreBuilder.withLoggingDisabled();
topology.addGlobalStore(rStoreBuilder, "globalprocessname", Serdes.Bytes().deserializer(), Serdes.ByteArray().deserializer(), "topic1", "processor2", new CustomProcessorSupplier1());
Properties p = new Properties();
p.put(APPLICATION_ID_CONFIG, "stream1");
p.put(BOOTSTRAP_SERVERS_CONFIG, KafkaUtil.getBootStrapServers());
p.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, KeySerde.class);
p.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, ValueSerde.class);
p.put(STATE_DIR_CONFIG, "/tmp/" + System.getProperty("server.port"));
streams = new KafkaStreams(topology, p);
streams.start();
}
}
运行单个实例时:-
预期:state-store 和 global-store 都必须包含所有键(来自 topic1 的所有输入分区的数据
Expected: Both state-store and global-store must contain all keys (data from all input partitions of topic1
实际:状态存储包含来自 2 个分区的数据全局存储包含来自 1 个分区的数据
Actual: State store contains data from 2 partitions Global store contains data from 1 partition
运行此代码的 2 个实例时:-
预期:两个全局存储都必须包含所有数据.3 个分区被划分在 2 个状态存储中并包含部分数据
Expected: Both global stores must contain all the data. 3 partitions are divided among 2 state stores and contain partial data
Actual:(S 表示状态存储,G 表示全局存储,P 表示输入数据的分区)S1 - P1G1 - P2S2 - P3G2 - P1、P2、P3
Actual: (S means statestore, G means global store, P means partition of input data) S1 - P1 G1 - P2 S2 - P3 G2 - P1, P2, P3
推荐答案
问题在于 StreamsConfig.APPLICATION_ID_CONFIG
.您对两种不同类型的应用程序使用相同的方法.
The issue is with StreamsConfig.APPLICATION_ID_CONFIG
. You use same for two different types of applications.
StreamsConfig.APPLICATION_ID_CONFIG
的值用作group.id
.group.id
用于扩展应用程序.如果您有两个相同应用程序的实例(具有相同的 group.id
),它们将开始处理来自分区子集的消息.
Value of StreamsConfig.APPLICATION_ID_CONFIG
is used as group.id
.
group.id
is used for scaling application. If you have two instance of same application (with same group.id
), they start processing messages from subset of partitions.
在您的情况下,您有两个不同的应用程序,但它们使用相同的 StreamsConfig.APPLICATION_ID_CONFIG
.对于它们中的每一个,分配分区子集(App1:2 个分区,App2:1 个分区)并且它们仅处理整个消息的子集.它是消费者组机制.
In your case you have two different applications but they used same StreamsConfig.APPLICATION_ID_CONFIG
. For each of them subset of partitions is assign (App1: 2 partitions, App2: 1 partition) and they process only subset of whole message. It is Consumer group mechanizm.
您可以找到有关消费者群体的更多信息:
More about Consumer group you can find:
这篇关于如何使用相同的 APPLICATION_ID_CONFIG 运行两个或多个拓扑?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!