如何使用相同的 APPLICATION_ID_CONFIG 运行两个或多个拓扑? [英] How to run two or more topologies with the same APPLICATION_ID_CONFIG?

查看:26
本文介绍了如何使用相同的 APPLICATION_ID_CONFIG 运行两个或多个拓扑?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在同一个实例上运行 2 个拓扑.1个拓扑涉及状态存储,其他涉及全局存储.我如何成功地做到这一点?

I want to run 2 topologies on same instance. 1 topology involves state store and other involves global store. How do I do this succesfully?

我创建了 1 个包含 3 个分区的主题,然后在 1 个拓扑中添加了一个状态存储,在第二个拓扑中添加了一个全局存储.

I have created 1 topic with 3 partitions and then added a state store in 1 topology and global store in 2nd topology.

拓扑 1:

    public void createTopology() {
    Topology topology = new Topology();

    topology.addSource("source", new KeyDeserializer(), new ValueDeserializer(), "topic1");
    topology.addProcessor("processor1", new CustomProcessorSupplier1(), "source");

    final KeyValueStoreBuilder<Bytes, byte[]> rStoreBuilder = new KeyValueStoreBuilder<>(new RocksDbKeyValueBytesStoreSupplier("rstore"), Serdes.Bytes(), Serdes.ByteArray(), Time.SYSTEM);
    rStoreBuilder.withLoggingEnabled(new HashMap<>());

    topology.addStateStore(rStoreBuilder, "processor1");

    Properties p = new Properties();
    p.put(APPLICATION_ID_CONFIG, "stream1");
    p.put(BOOTSTRAP_SERVERS_CONFIG, KafkaUtil.getBootStrapServers());
    p.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, KeySerde.class);
    p.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, ValueSerde.class);
    streams = new KafkaStreams(topology, p);
    streams.start();
}

拓扑 2:

public void createTopology() {
    Topology topology = new Topology();

    final KeyValueStoreBuilder<Bytes, byte[]> rStoreBuilder = new KeyValueStoreBuilder<>(new RocksDbKeyValueBytesStoreSupplier("rstoreg"), Serdes.Bytes(), Serdes.ByteArray(), Time.SYSTEM);
    rStoreBuilder.withLoggingDisabled();

    topology.addGlobalStore(rStoreBuilder, "globalprocessname", Serdes.Bytes().deserializer(), Serdes.ByteArray().deserializer(), "topic1", "processor2", new CustomProcessorSupplier1());

    Properties p = new Properties();
    p.put(APPLICATION_ID_CONFIG, "stream1");
    p.put(BOOTSTRAP_SERVERS_CONFIG, KafkaUtil.getBootStrapServers());
    p.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, KeySerde.class);
    p.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, ValueSerde.class);
    p.put(STATE_DIR_CONFIG, "/tmp/" + System.getProperty("server.port"));
    streams = new KafkaStreams(topology, p);
    streams.start();
}
}

运行单个实例时:-

预期:state-store 和 global-store 都必须包含所有键(来自 topic1 的所有输入分区的数据

Expected: Both state-store and global-store must contain all keys (data from all input partitions of topic1

实际:状态存储包含来自 2 个分区的数据全局存储包含来自 1 个分区的数据

Actual: State store contains data from 2 partitions Global store contains data from 1 partition

运行此代码的 2 个实例时:-

预期:两个全局存储都必须包含所有数据.3 个分区被划分在 2 个状态存储中并包含部分数据

Expected: Both global stores must contain all the data. 3 partitions are divided among 2 state stores and contain partial data

Actual:(S 表示状态存储,G 表示全局存储,P 表示输入数据的分区)S1 - P1G1 - P2S2 - P3G2 - P1、P2、P3

Actual: (S means statestore, G means global store, P means partition of input data) S1 - P1 G1 - P2 S2 - P3 G2 - P1, P2, P3

推荐答案

问题在于 StreamsConfig.APPLICATION_ID_CONFIG.您对两种不同类型的应用程序使用相同的方法.

The issue is with StreamsConfig.APPLICATION_ID_CONFIG. You use same for two different types of applications.

StreamsConfig.APPLICATION_ID_CONFIG 的值用作group.id.group.id 用于扩展应用程序.如果您有两个相同应用程序的实例(具有相同的 group.id),它们将开始处理来自分区子集的消息.

Value of StreamsConfig.APPLICATION_ID_CONFIG is used as group.id. group.id is used for scaling application. If you have two instance of same application (with same group.id), they start processing messages from subset of partitions.

在您的情况下,您有两个不同的应用程序,但它们使用相同的 StreamsConfig.APPLICATION_ID_CONFIG.对于它们中的每一个,分配分区子集(App1:2 个分区,App2:1 个分区)并且它们仅处理整个消息的子集.它是消费者组机制.

In your case you have two different applications but they used same StreamsConfig.APPLICATION_ID_CONFIG. For each of them subset of partitions is assign (App1: 2 partitions, App2: 1 partition) and they process only subset of whole message. It is Consumer group mechanizm.

您可以找到有关消费者群体的更多信息:

More about Consumer group you can find:

这篇关于如何使用相同的 APPLICATION_ID_CONFIG 运行两个或多个拓扑?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆