Kafka 流处理器上下文中的定期 NPE [英] Periodic NPE In Kafka Streams Processor Context

查看:35
本文介绍了Kafka 流处理器上下文中的定期 NPE的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 kafka-streams 0.10.0.0,我在转发消息时定期在 StreamTask 中看到空指针异常.它在 10% 到 50% 的调用之间变化.NPE 发生在这种方法中:

Using kafka-streams 0.10.0.0, I am periodically seeing a null pointer exception in the StreamTask when forwarding a message. It varies between 10% to 50% of the invocations. The NPE occurs in this method:

public <K, V> void forward(K key, V value) {
    ProcessorNode thisNode = currNode;
    try {
        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
            currNode = childNode;
            childNode.process(key, value);
        }
    } finally {
        currNode = thisNode;
    }
}

似乎在某些情况下,thisNode 字段为空.知道是什么原因造成的吗?堆栈跟踪如下.

It seems that in some cases, the thisNode field is null. Any idea what might be causing this ? The stack trace is below.

[ERROR] 2016-08-21 14:50:39.288 [StreamThread-1] StreamedMetricMeter - Forwarding failed
java.lang.NullPointerException
    at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:336) ~[kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) ~[kafka-streams-0.10.0.0.jar:?]
    at com.heliosapm.streams.metrics.processors.AbstractStreamedMetricProcessor.forward(AbstractStreamedMetricProcessor.java:552) [classes/:?]
    at com.heliosapm.streams.metrics.processors.impl.StreamedMetricMeter.doProcess(StreamedMetricMeter.java:89) [classes/:?]
    at com.heliosapm.streams.metrics.processors.impl.StreamedMetricMeter.doProcess(StreamedMetricMeter.java:1) [classes/:?]
    at com.heliosapm.streams.metrics.processors.AbstractStreamedMetricProcessor.process(AbstractStreamedMetricProcessor.java:166) [classes/:?]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) [kafka-streams-0.10.0.0.jar:?]

推荐答案

问题是我的 ProcessorSupplier 每次调用 get.反过来,Kafka Streams 引擎试图创建多个处理器实例,这无疑引发了多线程垃圾箱火灾.请注意同样粗心的人...... ProcessorSupplier.get() 应该在每次调用时返回一个新的处理器实例.

The problem was that my ProcessorSuppliers were returning the same instance of the Processor for every call to get. In turn, the Kafka Streams engine was attempting to create multiple processor instances, which I have no doubt created a multi-threaded dumpster fire. Note to the similarly unwary.... ProcessorSupplier.get() should return a new instance of a processor on each call.

这篇关于Kafka 流处理器上下文中的定期 NPE的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆