状态存储可能已迁移到另一个实例 [英] state store may have migrated to another instance

查看:18
本文介绍了状态存储可能已迁移到另一个实例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我尝试从流访问状态存储时,出现以下错误.

When I try to access state store from stream, I am getting below error.

状态存储 count-store 可能已迁移到另一个实例.

The state store, count-store, may have migrated to another instance.

当我尝试从商店访问 ReadOnlyKeyValueStore 时,在迁移到其他服务器时收到错误消息.但我只有一个经纪人正在运行

When I tried to access ReadOnlyKeyValueStore from store, getting error message as migrated to other server. but am having only one broker is up and running

/**
 * 
 */
package com.ms.kafka.com.ms.stream;

import java.util.Properties;
import java.util.stream.Stream;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

import com.ms.kafka.com.ms.entity.TrackingEvent;
import com.ms.kafka.com.ms.entity.TrackingEventDeserializer;
import com.ms.kafka.com.ms.entity.TrackingEvnetSerializer;


/**
 * @author vettri
 *
 */
public class EventStreamer {

    /**
     * 
     */
    public EventStreamer() {
        // TODO Auto-generated constructor stub
    }
    
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "trackeventstream_stream");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.CLIENT_ID_CONFIG,"testappdi");
        props.put("auto.offset.reset","earliest");
        /*
         * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
         * Serdes.String().getClass());
         * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
         * Serdes.String().getClass());
         */

        final StreamsBuilder builder = new StreamsBuilder();
        final KStream<String , TrackingEvent> eventStream = builder.stream("rt_event_command_topic_stream",Consumed.with(Serdes.String(),
                Serdes.serdeFrom(new TrackingEvnetSerializer(), new TrackingEventDeserializer())));
        KTable<String, Long> groupedByUniqueId = eventStream.groupBy((k,v) -> v.getUniqueid()).
                count(Materialized.as("count-store"));
        
        /*
         * KTable<Integer, Integer> table = builder.table( "rt_event_topic_stream",
         * Materialized.as("queryable-store-name"));
         */
        
        //eventStream.filter((k,v) -> "9de3b676-b20f-4b7a-878b-526fd5948a34".equalsIgnoreCase(v.getUniqueid())).foreach((k,v) -> System.out.println(v));
        final KafkaStreams stream = new KafkaStreams(builder.build(), props);
        stream.cleanUp();
        stream.start();
        System.out.println("Strema state : "+stream.state().name());
        String queryableStoreName = groupedByUniqueId.queryableStoreName();
        /*
         * ReadOnlyKeyValueStore keyValStore1 =
         * waitUntilStoreIsQueryable(queryableStoreName, (QueryableStoreTypes)
         * QueryableStoreTypes.keyValueStore(),stream);
         */ ReadOnlyKeyValueStore<Long , TrackingEvent> keyValStore = stream.store(queryableStoreName, QueryableStoreTypes.<Long,TrackingEvent>keyValueStore());
        
       // System.out.println("results --> "+keyValStore.get((long) 158));
        //streams.close();
    }
    
    public static <T> T waitUntilStoreIsQueryable(final String storeName,
            final QueryableStoreTypes queryableStoreType, final KafkaStreams streams) throws InterruptedException {
        while (true) {
            try {
                return streams.store(storeName, (QueryableStoreType<T>) queryableStoreType);
            } catch (InvalidStateStoreException ignored) {
// store not yet ready for querying
                System.out.println("system is waitng to ready for state store");
                Thread.sleep(100);
                //streams.close();
            }
        }
}

}

我需要检索我存储在状态存储中的数据,

I need to retrieve the data that i stored in state store,

我想做的是,需要将其存储在本地并检索强文本

what am trying to do is, need store it in local and retrievestrong text

推荐答案

在您的情况下,本地 KafkaStreams 实例尚未准备好,因此无法查询其本地状态存储.

In your case the local KafkaStreams instance is not yet ready and thus its local state stores cannot be queried yet.

在查询之前,您应该等待 KafkaStreams 处于 RUNNING 状态.你需要打电话给你 waitUntilStoreIsQueryable(...).

Before querying you should wait for KafkaStreams to be in RUNNING status. You need call you waitUntilStoreIsQueryable(...).

示例可以在 Confluent github 中找到:

Example can be found in Confluent github:

waitUntilStoreIsQueryable(...) 的使用

可以在此处找到有关原因的更多详细信息:https://docs.confluent.io/current/streams/faq.html#handling-invalidstatestoreexception-the-state-store-may-have-migrated-to-另一个实例

More details regarding cause can be found here: https://docs.confluent.io/current/streams/faq.html#handling-invalidstatestoreexception-the-state-store-may-have-migrated-to-another-instance

这篇关于状态存储可能已迁移到另一个实例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆