Kafka Streams永久存储错误:状态存储,可能已迁移到另一个实例 [英] Kafka Streams persistent store error: the state store, may have migrated to another instance

查看:75
本文介绍了Kafka Streams永久存储错误:状态存储,可能已迁移到另一个实例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Spring Boot中使用Kafka Streams.在我的用例中,当我从其他微服务收到客户事件时,我需要存储在客户实例化视图中,当我收到 order 事件时,我需要加入客户并订购然后存储在客户订单实例化视图中.为此,我创建了持久键值存储 customer-store

I am using Kafka Streams with Spring Boot. In my use case when I receive customer event from other microservice I need to store in customer materialized view and when I receive order event, I need to join customer and order then store in customer-order materialized view. To achieve this I created persistent key-value store customer-store and updating this when a new event comes.

    StoreBuilder customerStateStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("customer"),Serdes.String(), customerSerde).withLoggingEnabled(new HashMap<>());
streamsBuilder.addStateStore(customerStateStore);
KTable<String,Customer> customerKTable=streamsBuilder.table("customer",Consumed.with(Serdes.String(),customerSerde));
    customerKTable.foreach(((key, value) -> System.out.println("Customer from Topic: "+value)));

已配置的拓扑,流和已启动的流对象.当我尝试使用ReadOnlyKeyValueStore访问存储区时,即使我刚才存储了一些对象,也遇到了以下异常

Configured Topology, Streams and started streams object. When I try to access store using ReadOnlyKeyValueStore, I got the following exception, even though I stored some objects few moments ago

streams.start();
ReadOnlyKeyValueStore<String, Customer> customerStore = streams.store("customer", QueryableStoreTypes.keyValueStore());
System.out.println("customerStore.approximateNumEntries()-> " + customerStore.approximateNumEntries());

代码已上传到 Github 以供参考.感谢您的帮助.

Code uploaded to Github for reference. Appreciate your help.

例外:

org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, customer, may have migrated to another instance.
    at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043)
    at com.kafkastream.service.EventsListener.main(EventsListener.java:94)

推荐答案

状态存储通常需要一些时间来准备.最简单的方法如下所示. (官方文件中的代码)

The state store needs some time to be prepared usually. The simplest approach is like below. (code from the official document)

public static <T> T waitUntilStoreIsQueryable(final String storeName,
                                              final QueryableStoreType<T> queryableStoreType,
                                              final KafkaStreams streams) throws InterruptedException {
  while (true) {
    try {
      return streams.store(storeName, queryableStoreType);
    } catch (InvalidStateStoreException ignored) {
      // store not yet ready for querying
      Thread.sleep(100);
    }
  }
}

您可以在文档中找到其他信息. https://docs.confluent.io/current/streams/faq. html#interactive-queries

You can find additional info in the document. https://docs.confluent.io/current/streams/faq.html#interactive-queries

这篇关于Kafka Streams永久存储错误:状态存储,可能已迁移到另一个实例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆