InvalidStateStoreException:状态存储未在Kafka流中打开 [英] InvalidStateStoreException: the state store is not open in Kafka streams

查看:86
本文介绍了InvalidStateStoreException:状态存储未在Kafka流中打开的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

StreamsBuilder builder = new StreamsBuilder();

    Map<String, ?> serdeConfig = Collections.singletonMap(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);

    Serde keySerde= getSerde(keyClass);
    keySerde.configure(serdeConfig,true);

    Serde valueSerde = getSerde(valueClass);
    valueSerde.configure(serdeConfig,false);

    StoreBuilder<KeyValueStore<K,V>> store =
        Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("mystore"),
            keySerde,valueSerde).withCachingEnabled();

    builder.addGlobalStore(store,"mytopic", Consumed.with(keySerde,valueSerde),this::processMessage);

    streams=new KafkaStreams(builder.build(),properties);

    registerShutdownHook();

    streams.start();

    readOnlyKeyValueStore = waitUntilStoreIsQueryable("mystore", QueryableStoreTypes.<Object, V>keyValueStore(), streams);


private <T> T waitUntilStoreIsQueryable(final String storeName,
      final QueryableStoreType<T> queryableStoreType,
      final KafkaStreams streams) {

    // 25 seconds
    long timeout=250;

    while (timeout>0) {
      try {
        timeout--;
        return streams.store(storeName, queryableStoreType);
      } catch (InvalidStateStoreException ignored) {
        // store not yet ready for querying
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          logger.error(e);
        }
      }
    }
    throw new StreamsException("ReadOnlyKeyValueStore is not queryable within 25 seconds");
  }

错误如下:

19:42:35.049 [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread] ERROR org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer - global-stream-thread [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread] Updating global state failed. You can restart KafkaStreams to recover from this error.
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {my_component-0=6}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:990) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:491) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239) [kafka-streams-2.3.0.jar:?]
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290) [kafka-streams-2.3.0.jar:?]
19:42:35.169 [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d] Global thread has died. The instance will be in error state and should be closed.
19:42:35.169 [my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread] ERROR org.apache.zookeeper.server.NIOServerCnxnFactory - Thread Thread[my_component.app-91fa5d9f-aba8-4419-a063-93635903ff5d-GlobalStreamThread,5,main] died
org.apache.kafka.streams.errors.StreamsException: Updating global state failed. You can restart KafkaStreams to recover from this error.
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:250) ~[kafka-streams-2.3.0.jar:?]
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290) ~[kafka-streams-2.3.0.jar:?]
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {my_component-0=6}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:990) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:491) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) ~[kafka-clients-2.2.1.jar:?]
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239) ~[kafka-streams-2.3.0.jar:?]
    ... 1 more

org.apache.kafka.streams.errors.InvalidStateStoreException: State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.

    at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:60)

我看到两个不同的例外.

I see two different exceptions.

  1. InvalidStateStoreException-存储未打开

  1. InvalidStateStoreException - store is not open

InvalidStateStoreException-存储不再可用,可能已迁移到另一个实例

InvalidStateStoreException - Store is not available any more and might have migrated to another instance

我只有一个运行Windows的流应用程序实例,并且具有应用程序ID.

I have only one instance of the stream application running on Windows with an application id.

从上述核心出发,我一直在等待商店可查询,但是仍然无法打开商店,并且商店可能不可用.

From the above core, I am waiting until the store is queryable, but still I get store not open and store may not be available.

该异常(及其解决方案)的可能原因是什么?

What are the possible reasons for the exception (and its solution)?

首先,以上代码写得正确吗?

First of all, is the above code write-up correct?

推荐答案

OffsetOutOfRangeException表示以状态存储在.checkpoint文件中的偏移量超出了Kafka群集中主题的偏移量

OffsetOutOfRangeException means that the offsets that are stored in the state in the .checkpoint file are out of range with those offsets of the topic in the Kafka cluster.

清除和/或重新创建主题时会发生这种情况.它可能不包含检查点中给定偏移量那么多的消息.

This happens when the topic is cleared and or re-created. It may not contain those many messages as that of the given offsets in the checkpoint.

我发现,重置.checkpoint文件会有所帮助. .checkpoint文件将是这样的.

I have found that, resetting the .checkpoint file will help. The .checkpoint file will be something like this.

0
1
my_component 0  6
my_component 1  0

在这里,0是分区,而6是偏移量.同样,1是分区,0是偏移量.

Here, 0 is partition and 6 is offset. Similarly, 1 is partition and 0 is offset.

说明my_component-0-6例外,它表示my_component主题的第0个分区的第6个偏移量超出范围.

The description my_component-0-6 in the exception means that 6th offset of 0th partition of my_component topic is out of range.

由于重新创建了主题,因此第六偏移量不存在.因此将6更改为0.

Since, the topic is re-created, the 6th offset does not exist. So change 6 to 0.

重要的是要注意,在对Kafka进行单元测试时,测试完成后必须清理状态目录,因为在测试完成后嵌入式Kafka集群及其主题不存在,因此不会可以将偏移量保留在状态存储中(因为它们将变得过时).

It is important to note that, while unit testing Kafka, you must clean up the state directory once the test is complete, because your embedded Kafka cluster and its topics does not exist after the test is completed and therefore it does not make sense to retain the offsets in your state store (since they will become stale).

因此,请确保在测试后清理您的状态目录(通常是/tmp/kafka-streams或Windows C:\tmp\kafka-streams).

So, ensure that your state directory (typically, /tmp/kafka-streams or in Windows C:\tmp\kafka-streams) is cleaned up after the test.

此外,重置检查点文件仅是一种解决方法,也不是生产中的理想解决方案.

Also, resetting the checkpoint file is only a workaround, and is not an ideal solution in production.

在生产中,如果状态存储与其对应主题的存储不兼容(即偏移量超出范围),则意味着存在某种损坏,可能是某些损坏已删除并重新创建.主题.

In production, if the state store is in-compatible with that of its corresponding topic (that is offsets are out of range), then it means that there is some corruption, possible some one might have deleted and re-created the topic.

在这种情况下,我认为清理可能是唯一可能的解决方案.因为,您的状态存储中包含陈旧信息,因此该信息不再有效(就新主题而言).

In such a situation, I think, clean up might be the only possible solution. Because, your state store contains stale information which is therefore no longer valid (so far as new topic is concerned).

这篇关于InvalidStateStoreException:状态存储未在Kafka流中打开的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆