Kafka流处理器上下文中的定期NPE [英] Periodic NPE In Kafka Streams Processor Context
问题描述
使用kafka-streams 0.10.0.0,转发消息时,我定期在StreamTask中看到空指针异常.它在调用的10%到50%之间变化.NPE会通过这种方法发生:
Using kafka-streams 0.10.0.0, I am periodically seeing a null pointer exception in the StreamTask when forwarding a message. It varies between 10% to 50% of the invocations. The NPE occurs in this method:
public <K, V> void forward(K key, V value) {
ProcessorNode thisNode = currNode;
try {
for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
currNode = childNode;
childNode.process(key, value);
}
} finally {
currNode = thisNode;
}
}
在某些情况下, thisNode 字段似乎为空.知道是什么原因造成的吗?堆栈跟踪在下面.
It seems that in some cases, the thisNode field is null. Any idea what might be causing this ? The stack trace is below.
[ERROR] 2016-08-21 14:50:39.288 [StreamThread-1] StreamedMetricMeter - Forwarding failed
java.lang.NullPointerException
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:336) ~[kafka-streams-0.10.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) ~[kafka-streams-0.10.0.0.jar:?]
at com.heliosapm.streams.metrics.processors.AbstractStreamedMetricProcessor.forward(AbstractStreamedMetricProcessor.java:552) [classes/:?]
at com.heliosapm.streams.metrics.processors.impl.StreamedMetricMeter.doProcess(StreamedMetricMeter.java:89) [classes/:?]
at com.heliosapm.streams.metrics.processors.impl.StreamedMetricMeter.doProcess(StreamedMetricMeter.java:1) [classes/:?]
at com.heliosapm.streams.metrics.processors.AbstractStreamedMetricProcessor.process(AbstractStreamedMetricProcessor.java:166) [classes/:?]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) [kafka-streams-0.10.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) [kafka-streams-0.10.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) [kafka-streams-0.10.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) [kafka-streams-0.10.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) [kafka-streams-0.10.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320) [kafka-streams-0.10.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) [kafka-streams-0.10.0.0.jar:?]
推荐答案
问题是我的获取.反过来,Kafka Streams引擎正在尝试创建多个处理器实例,毫无疑问,我创建了多线程垃圾箱.请注意类似的粗心.... ProcessorSupplier.get()应该在每次调用时返回一个新的处理器实例.
The problem was that my ProcessorSuppliers were returning the same instance of the Processor for every call to get. In turn, the Kafka Streams engine was attempting to create multiple processor instances, which I have no doubt created a multi-threaded dumpster fire. Note to the similarly unwary.... ProcessorSupplier.get() should return a new instance of a processor on each call.
这篇关于Kafka流处理器上下文中的定期NPE的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!