如何使用相同的APPLICATION_ID_CONFIG运行两个或多个拓扑? [英] How to run two or more topologies with the same APPLICATION_ID_CONFIG?

查看:62
本文介绍了如何使用相同的APPLICATION_ID_CONFIG运行两个或多个拓扑?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在同一实例上运行2种拓扑. 1个拓扑涉及状态存储,而其他拓扑涉及全局存储.我如何成功做到这一点?

I want to run 2 topologies on same instance. 1 topology involves state store and other involves global store. How do I do this succesfully?

我创建了1个具有3个分区的主题,然后在1个拓扑中添加了状态存储,并在2nd拓扑中添加了全局存储.

I have created 1 topic with 3 partitions and then added a state store in 1 topology and global store in 2nd topology.

拓扑1:

    public void createTopology() {
    Topology topology = new Topology();

    topology.addSource("source", new KeyDeserializer(), new ValueDeserializer(), "topic1");
    topology.addProcessor("processor1", new CustomProcessorSupplier1(), "source");

    final KeyValueStoreBuilder<Bytes, byte[]> rStoreBuilder = new KeyValueStoreBuilder<>(new RocksDbKeyValueBytesStoreSupplier("rstore"), Serdes.Bytes(), Serdes.ByteArray(), Time.SYSTEM);
    rStoreBuilder.withLoggingEnabled(new HashMap<>());

    topology.addStateStore(rStoreBuilder, "processor1");

    Properties p = new Properties();
    p.put(APPLICATION_ID_CONFIG, "stream1");
    p.put(BOOTSTRAP_SERVERS_CONFIG, KafkaUtil.getBootStrapServers());
    p.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, KeySerde.class);
    p.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, ValueSerde.class);
    streams = new KafkaStreams(topology, p);
    streams.start();
}

拓扑2:

public void createTopology() {
    Topology topology = new Topology();

    final KeyValueStoreBuilder<Bytes, byte[]> rStoreBuilder = new KeyValueStoreBuilder<>(new RocksDbKeyValueBytesStoreSupplier("rstoreg"), Serdes.Bytes(), Serdes.ByteArray(), Time.SYSTEM);
    rStoreBuilder.withLoggingDisabled();

    topology.addGlobalStore(rStoreBuilder, "globalprocessname", Serdes.Bytes().deserializer(), Serdes.ByteArray().deserializer(), "topic1", "processor2", new CustomProcessorSupplier1());

    Properties p = new Properties();
    p.put(APPLICATION_ID_CONFIG, "stream1");
    p.put(BOOTSTRAP_SERVERS_CONFIG, KafkaUtil.getBootStrapServers());
    p.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, KeySerde.class);
    p.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, ValueSerde.class);
    p.put(STATE_DIR_CONFIG, "/tmp/" + System.getProperty("server.port"));
    streams = new KafkaStreams(topology, p);
    streams.start();
}
}

运行单个实例时:-

预期: 状态存储和全局存储都必须包含所有键(来自topic1所有输入分区的数据

Expected: Both state-store and global-store must contain all keys (data from all input partitions of topic1

实际: 状态存储包含2个分区中的数据 全局存储包含来自1个分区的数据

Actual: State store contains data from 2 partitions Global store contains data from 1 partition

运行此代码的2个实例时:-

预期:两个全局存储区都必须包含所有数据. 3个分区分为2个状态存储,其中包含部分数据

Expected: Both global stores must contain all the data. 3 partitions are divided among 2 state stores and contain partial data

Actual:(S表示状态存储,G表示全局存储,P表示输入数据的分区) S1-P1 G1-P2 S2-P3 G2-P1,P2,P3

Actual: (S means statestore, G means global store, P means partition of input data) S1 - P1 G1 - P2 S2 - P3 G2 - P1, P2, P3

推荐答案

问题出在StreamsConfig.APPLICATION_ID_CONFIG.您可以将它们用于两种不同类型的应用程序.

The issue is with StreamsConfig.APPLICATION_ID_CONFIG. You use same for two different types of applications.

StreamsConfig.APPLICATION_ID_CONFIG的值用作group.id. group.id用于扩展应用程序.如果您有同一应用程序的两个实例(具有相同的group.id),它们将开始处理分区子集中的消息.

Value of StreamsConfig.APPLICATION_ID_CONFIG is used as group.id. group.id is used for scaling application. If you have two instance of same application (with same group.id), they start processing messages from subset of partitions.

在您的情况下,您有两个不同的应用程序,但是它们使用相同的StreamsConfig.APPLICATION_ID_CONFIG.为它们中的每一个分配分区的子集(App1:2个分区,App2:1分区),并且它们仅处理整个消息的子集.这是消费者组机械.

In your case you have two different applications but they used same StreamsConfig.APPLICATION_ID_CONFIG. For each of them subset of partitions is assign (App1: 2 partitions, App2: 1 partition) and they process only subset of whole message. It is Consumer group mechanizm.

有关消费者组的更多信息,您可以找到:

More about Consumer group you can find:

这篇关于如何使用相同的APPLICATION_ID_CONFIG运行两个或多个拓扑?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆