InvalidStateStoreException:状态存储未在 Kafka 流中打开 [英] InvalidStateStoreException: the state store is not open in Kafka streams

查看:21
本文介绍了InvalidStateStoreException:状态存储未在 Kafka 流中打开的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

StreamsBuilder builder = new StreamsBuilder();

    Map<String, ?> serdeConfig = Collections.singletonMap(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);

    Serde keySerde= getSerde(keyClass);
    keySerde.configure(serdeConfig,true);

    Serde valueSerde = getSerde(valueClass);
    valueSerde.configure(serdeConfig,false);

    StoreBuilder<KeyValueStore<K,V>> store =
        Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("mystore"),
            keySerde,valueSerde).withCachingEnabled();

    builder.addGlobalStore(store,"mytopic", Consumed.with(keySerde,valueSerde),this::processMessage);

    streams=new KafkaStreams(builder.build(),properties);

    registerShutdownHook();

    streams.start();

    readOnlyKeyValueStore = waitUntilStoreIsQueryable("mystore", QueryableStoreTypes.<Object, V>keyValueStore(), streams);


private <T> T waitUntilStoreIsQueryable(final String storeName,
      final QueryableStoreType<T> queryableStoreType,
      final KafkaStreams streams) {

    // 25 seconds
    long timeout=250;

    while (timeout>0) {
      try {
        timeout--;
        return streams.store(storeName, queryableStoreType);
      } catch (InvalidStateStoreException ignored) {
        // store not yet ready for querying
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          logger.error(e);
        }
      }
    }
    throw new StreamsException("ReadOnlyKeyValueStore is not queryable within 25 seconds");
  }

错误如下:

19:42:35.049 [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread] ERROR org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer - global-stream-thread [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread] Updating global state failed. You can restart KafkaStreams to recover from this error.
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {my_component-0=6}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:990) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:491) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239) [kafka-streams-2.3.0.jar:?]
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290) [kafka-streams-2.3.0.jar:?]
19:42:35.169 [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d] Global thread has died. The instance will be in error state and should be closed.
19:42:35.169 [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread] ERROR org.apache.zookeeper.server.NIOServerCnxnFactory - Thread Thread[my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread,5,main] died
org.apache.kafka.streams.errors.StreamsException: Updating global state failed. You can restart KafkaStreams to recover from this error.
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:250) ~[kafka-streams-2.3.0.jar:?]
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290) ~[kafka-streams-2.3.0.jar:?]
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {my_component-0=6}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:990) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:491) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239) ~[kafka-streams-2.3.0.jar:?]
    ... 1 more

org.apache.kafka.streams.errors.InvalidStateStoreException: State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.

    at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:60)

我看到两个不同的例外.

I see two different exceptions.

  1. InvalidStateStoreException - 商店未开放

  1. InvalidStateStoreException - store is not open

InvalidStateStoreException - 存储不再可用,可能已迁移到另一个实例

InvalidStateStoreException - Store is not available any more and might have migrated to another instance

我只有一个流应用程序实例在 Windows 上运行,并带有应用程序 ID.

I have only one instance of the stream application running on Windows with an application id.

从上面的核心,我一直在等待,直到商店可以查询,但我仍然没有打开商店并且商店可能不可用.

From the above core, I am waiting until the store is queryable, but still I get store not open and store may not be available.

异常的可能原因(及其解决方案)是什么?

What are the possible reasons for the exception (and its solution)?

首先,上面的代码编写是否正确?

First of all, is the above code write-up correct?

推荐答案

OffsetOutOfRangeException 表示 .checkpoint 文件中 state 中存储的偏移量超出范围与 Kafka 集群中主题的那些偏移量.

OffsetOutOfRangeException means that the offsets that are stored in the state in the .checkpoint file are out of range with those offsets of the topic in the Kafka cluster.

当主题被清除和/或重新创建时,就会发生这种情况.它可能不包含与检查点中给定偏移量一样多的消息.

This happens when the topic is cleared and or re-created. It may not contain those many messages as that of the given offsets in the checkpoint.

我发现,重置 .checkpoint 文件会有所帮助..checkpoint 文件将是这样的.

I have found that, resetting the .checkpoint file will help. The .checkpoint file will be something like this.

0
1
my_component 0  6
my_component 1  0

这里,0 是分区,6 是偏移量.同理,1是分区,0是偏移量.

Here, 0 is partition and 6 is offset. Similarly, 1 is partition and 0 is offset.

异常中的描述 my_component-0-6 表示 my_component 主题的第 0 个分区的第 6 个偏移量超出范围.

The description my_component-0-6 in the exception means that 6th offset of 0th partition of my_component topic is out of range.

由于重新创建了主题,因此第 6 个偏移量不存在.所以把6改成0.

Since, the topic is re-created, the 6th offset does not exist. So change 6 to 0.

需要注意的是,在对 Kafka 进行单元测试时,必须在测试完成后清理 state 目录,因为测试完成后您的嵌入式 Kafka 集群及其主题不存在,因此不会使保留状态存储中的偏移量是有意义的(因为它们会变得陈旧).

It is important to note that, while unit testing Kafka, you must clean up the state directory once the test is complete, because your embedded Kafka cluster and its topics does not exist after the test is completed and therefore it does not make sense to retain the offsets in your state store (since they will become stale).

因此,请确保您的状态目录(通常为 /tmp/kafka-streams 或在 Windows C:\tmp\kafka-streams 中)在测试.

So, ensure that your state directory (typically, /tmp/kafka-streams or in Windows C:\tmp\kafka-streams) is cleaned up after the test.

此外,重置检查点文件只是一种解决方法,并不是生产中的理想解决方案.

Also, resetting the checkpoint file is only a workaround, and is not an ideal solution in production.

在生产中,如果状态存储与其对应主题的状态存储不兼容(即偏移量超出范围),则意味着存在一些损坏,可能有人删除并重新创建了主题.

In production, if the state store is in-compatible with that of its corresponding topic (that is offsets are out of range), then it means that there is some corruption, possible some one might have deleted and re-created the topic.

在这种情况下,我认为清理可能是唯一可能的解决方案.因为,您的状态存储包含陈旧信息,因此不再有效(就新主题而言).

In such a situation, I think, clean up might be the only possible solution. Because, your state store contains stale information which is therefore no longer valid (so far as new topic is concerned).

这篇关于InvalidStateStoreException:状态存储未在 Kafka 流中打开的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆