使用低级 Kafka Stream API 执行 context.forward() 时的 NPE [英] NPE while doing context.forward() using low-level Kafka Stream API
问题描述
我使用低级 Kafka API 构建了一个普通的 Kafka 流 API.拓扑是线性的.
I have built a plain Kafka streams API using Low-level Kafka API. The topology is linear.
p1 ->p2 ->p3
p1 -> p2 -> p3
在执行 context.forward() 时,我得到了 NPE,这里是片段:
While doing context.forward(), I am getting NPE, snippet here:
NAjava.lang.NullPointerException: null
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:178)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
...
我使用的是 Kafka Stream 2.3.0.
I am using Kafka Stream 2.3.0.
我看到一个类似的 SO 问题 [here][1],该问题基于非常旧的版本.那么,不确定这是否是相同的错误?
I see a similar SO question [here][1], and the question is based on the very old version. So, not sure if this is the same error?
我正在添加更多信息,保留我正在做的事情的要点
I am putting some more info, keeping the Gist of what I am doing
public class SP1Processor implements StreamProcessor {
private StreamProcessingContext ctxt;
// In init(), create a single thread pool
// which does some processing and sends the
// data to next processor
@Override
void init(StreamProcessingContext ctxt) {
this.ctxt = ctxt;
// Create a thread pool, do some work
// and then do this.ctxt.forward(K,V)
// Not showing code of Thread pool
// Strangely, inside this thread pool,
// this.ctxt isn't same what I see in process()
// shouldn't it be same? ctxt is member variable
// and shouldn't it be same
// this.ctxt.forward(K,V) here in this thread pool is causing NPE
// why does it happen?
this.ctxt.forward(K,V);
}
@Override
void process(K,V) {
// Here do some processing and go to the next processor chain
// This works fine
this.ctxt.forward(K,V);
}
}
[1]: https://stackoverflow.com/questions/39067846/periodic-npe-in-kafka-streams-processor-context
推荐答案
看起来它可能与链接的问题是同一个问题,尽管我们在您的案例中讨论的是更现代的版本.确保 ProcessorSupplier.get()
每次调用时都会返回一个新实例.
It looks like it could be the same issue as the linked question, although we are talking about a much more contemporary version in your case.
Make sure that ProcessorSupplier.get()
returns a new instance each time it is called.
这篇关于使用低级 Kafka Stream API 执行 context.forward() 时的 NPE的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!