春光云流Kafka Stream-如何处理运行时异常? [英] Spring Cloud Stream Kafka Stream - How to handle runtime exceptions?
本文介绍了春光云流Kafka Stream-如何处理运行时异常?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在为我的Spring Kafka Streams应用程序的定制而苦苦挣扎。 我一直在尝试在我的KStreams中配置未捕获(运行时异常)处理。
@Configuration
@Slf4j
public class CustomKafkaStreamsConfiguration {
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
log.error("An exception has occurred={}", e.getMessage()) ;
});
}
});
};
}
}
稍后我有一个KStream
@Bean
public Function<KStream<String, Transaction>,
KStream<String, Transaction>> paymentExecution() {
return stream -> stream
.peek((k, v) -> {
if (v.getStatus().equals(PaymentStatus.UNKNOWN)) {
throw new IllegalStateException();
}
});
}
如果我发送状态未知的事务StreamThread因IlLegalStateException而终止,则我的KStream不再使用任何传入请求。
Exception in thread "payment-d4b6ddd2-40ab-4eeb-afe4-e7fc3caa2b9c-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=payment-request, partition=0, offset=13, stacktrace=java.lang.IllegalStateException
at payment.process.PaymentExecutionRequestProcessor.lambda$paymentExecution$4(PaymentExecutionRequestProcessor.java:48)
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: java.lang.IllegalStateException
at payment.process.PaymentExecutionRequestProcessor.lambda$paymentExecution$4(PaymentExecutionOrchestratorProcessor.java:48)
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
我是不是错过了什么?或者有更合适的方法来处理在处理流时引发的运行时异常?
在运行时异常之后,我希望提交该异常事件,然后我希望我StreamThread-1仍然能够使用事件。
推荐答案
我相信您可以将setUncaughtExceptionHandler修改为如下所示
(throwable -> {
log.error("An exception has occurred={}", e.getMessage()) ;
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD; });
本质上,REPLACE_THREAD应该关闭当前线程,而不是终止应用程序/流并继续流。请参阅此帖子以了解更多详细信息 https://stackoverflow.com/a/68058414/1799107
这篇关于春光云流Kafka Stream-如何处理运行时异常?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文