java.lang.IllegalStateException:这不应该发生,因为 headers() 应该只在处理记录时调用 [英] java.lang.IllegalStateException: This should not happen as headers() should only be called while a record is processed

查看:18
本文介绍了java.lang.IllegalStateException:这不应该发生,因为 headers() 应该只在处理记录时调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

启动 Stream 应用程序(使用 Kafka Streams)失败并显示java.lang.IllegalStateException:这不应该发生,因为 headers() 应该只在处理记录时调用"

Starting up a Stream application (using Kafka Streams) fails with the "java.lang.IllegalStateException: This should not happen as headers() should only be called while a record is processed"

如果主题中已有数据,这似乎只会在我启动应用程序时发生.如果主题为空并且我开始向其推送数据,则一切正常.

This seems to only happen when I start up the application if there is already data in the topic. If the topic is empty and I start pushing data to it, all is fine.

有人知道为什么会这样吗?

Would someone know why this would happen?

谢谢

This should not happen as headers() should only be called while a record is processed
java.lang.IllegalStateException: This should not happen as headers() should only be called while a record is processed
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.headers(AbstractProcessorContext.java:149)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:235)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:222)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:37)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:153)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:446)
        at fr.mediametrie.GeoIPEnrichmentProcessor.transform(GeoIPEnrichmentProcessor.java:97)
        at fr.mediametrie.GeoIPEnrichmentProcessor.transform(GeoIPEnrichmentProcessor.java:1)
        at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:47)
        at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:36)
        at org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:56)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:364)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:890)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)

推荐答案

基于 Matthias 共享的信息,这里有一个实际示例:

Building on the information Matthias shared, here's a practical example:

  • 返回相同的转换器对象(也适用于 Processor 和 ValueTransformer)会导致 IllegalStateException:
var transformer = new Transformer();

inputStream
  .transformValues(() -> transformer)
// further operations...

  • 每次返回一个新实例可以防止异常:
  • inputStream
      .transformValues(() -> new Transformer())
    // further operations...
    
    

    ps:我知道文档很清楚,但希望这可以节省一些滚动时间.

    p.s: I am aware that the documentation is quite clear, but hope this saves some scrolling.

    这篇关于java.lang.IllegalStateException:这不应该发生,因为 headers() 应该只在处理记录时调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆