无法通过与StateStore不同的应用程序访问KTable [英] Cannot access KTable from a different app as StateStore

查看:66
本文介绍了无法通过与StateStore不同的应用程序访问KTable的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个Java应用程序(App1,App2)来测试如何在Docker的单个实例环境中从其他应用程序访问KTable.

I have two Java Application (App1, App2) to test how to access a KTable from a different app on a single instance environment in docker.

第一个应用程序(App1)使用以下代码写入KTable.

The first App (App1) writes to a KTable with following code.

public static void main(String[] args)
    {
        final Properties props = new Properties();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG,"gateway-service");   
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.11:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, ServiceTransactionSerde.class);


    KStreamBuilder builder = new KStreamBuilder();


    KStream<String,ServiceTransaction> source = builder.stream("gateway_request_processed");


    KStream<String, Long> countByApi = source.groupBy((key,value)-> value.getApiId().toString()).count("Counts").toStream();

    countByApi.to(Serdes.String(), Serdes.Long(),"countByApi");

    countByApi.print();

    final KafkaStreams streams = new KafkaStreams(builder,props);

    streams.start();
    System.out.println(streams.state());

    System.out.println(streams.allMetadata());
    System.out.println(streams.allMetadataForStore("countByApi"));

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {


        @Override
        public void run() {
            System.out.println(streams.allMetadata());
            streams.close();
        }
    }));
}

运行生产者时,我得到了App1中代码的以下输出

When I run my producer I got following output for the code in App1

    RUNNING
[]
[]
[KTABLE-TOSTREAM-0000000006]: c00af5ee-3c2d-4d12-9c4b-3b55c1284dd6, 19

这向我显示状态=正在运行.该商店的元数据也为空.但是请求已得到处理并成功存储在KTable中(字符串,长整数).

This shows me state = RUNNING. Metadata are empty also for the store. But the request gets processed and store in the KTable successfully (String,Long).

当我运行kafka-topics.sh --list --zookeeper:2181 我得到以下主题.

When I run kafka-topics.sh --list --zookeeper:2181 I get the following topics.

bash-4.3# kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
countByApi
gateway-Counts-changelog
gateway-Counts-repartition
gateway-service-Counts-changelog
gateway-service-Counts-repartition
gateway_request_processed

这向我展示了KTable在某种程度上坚持了新主题.

This shows me that the KTable is somehow persisted with new topics.

然后我有一个带有以下代码的secound命令行应用程序(App2),该代码尝试访问此KTable作为状态存储(ReadOnlyKeyValueStore),并对其进行访问.

I then have a secound command line app (App2) with following code which tries to access this KTable as a state store (ReadOnlyKeyValueStore) and access it.

 public static void main( String[] args )
    {
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "gateway-service-table-client");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.11:9092");


        KStreamBuilder builder = new KStreamBuilder();
        KafkaStreams streams = new KafkaStreams(builder,props);

        streams.cleanUp();
        streams.start();
        System.out.println( "Hello World!" );
        System.out.println(streams.state());

        ReadOnlyKeyValueStore<String,Long> keyValueStore =
        streams.store("countByApi", QueryableStoreTypes.keyValueStore());

        final KeyValueIterator<String,Long> range = keyValueStore.all();

        while(range.hasNext()){
            KeyValue<String,Long> next = range.next();
            System.out.println(String.format("key: %s | value: %s", next.key,next.value));

        }

        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {


                        @Override
                        public void run() {
                            System.out.println(streams.allMetadata());
                            streams.close();
                        }
        }));

    }

当我运行2. App时,我确实收到了错误消息:

When I run the 2. App I do get the error message:

    RUNNING
Exception in thread "main" org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, countByApi, may have migrated to another instance.
    at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:728)
    at com.comp.streamtable.App.main(App.java:37)

不幸的是,我确实只有1个实例,并且我确认状态等于"RUNNING".

Unfortunatly I do have only 1 instance and I verify that the state is equal "RUNNING".

注意:我必须为每个应用程序选择不同的application.id,因为这是另一个例外.只是想指出这一点,因为这可能是出于兴趣.

Note: I had to choose different application.id for each app since this thew another Exception. Just wanted to point this out since this might be for interest.

在这里我想从另一个应用程序访问我的KTable吗?

What do I miss here to access my KTable from another app?

推荐答案

您正在为两个应用程序使用两个不同的application.id.这样,两个应用程序就完全解耦了.

You are using two different application.id for both applications. Thus, both applications are completely decoupled.

交互式查询是为同一应用程序的不同实例而设计的,不适用于所有应用程序.

Interactive Queries are designed for different instances of the same app, and do not work across applications.

此博客文章可能会有所帮助:

This blog post might help: https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/

这篇关于无法通过与StateStore不同的应用程序访问KTable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆