春光云流Kafka Stream-如何处理运行时异常? [英] Spring Cloud Stream Kafka Stream - How to handle runtime exceptions?

查看:24
本文介绍了春光云流Kafka Stream-如何处理运行时异常?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在为我的Spring Kafka Streams应用程序的定制而苦苦挣扎。 我一直在尝试在我的KStreams中配置未捕获(运行时异常)处理。

参照文档https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_streams_binder--应该这样做:

@Configuration
@Slf4j
public class CustomKafkaStreamsConfiguration {

    @Bean
    public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
        return factoryBean -> {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
                        log.error("An exception has occurred={}", e.getMessage()) ;
                    });

                }
            });
        };
    }

}

稍后我有一个KStream

    @Bean
    public Function<KStream<String, Transaction>,
            KStream<String, Transaction>> paymentExecution() {
        return stream -> stream
                .peek((k, v) -> {
                    if (v.getStatus().equals(PaymentStatus.UNKNOWN)) {
                        throw new IllegalStateException();
                    }
                });
    }

如果我发送状态未知的事务StreamThread因IlLegalStateException而终止,则我的KStream不再使用任何传入请求。

Exception in thread "payment-d4b6ddd2-40ab-4eeb-afe4-e7fc3caa2b9c-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=payment-request, partition=0, offset=13, stacktrace=java.lang.IllegalStateException
    at payment.process.PaymentExecutionRequestProcessor.lambda$paymentExecution$4(PaymentExecutionRequestProcessor.java:48)
    at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)

    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: java.lang.IllegalStateException
    at payment.process.PaymentExecutionRequestProcessor.lambda$paymentExecution$4(PaymentExecutionOrchestratorProcessor.java:48)
    at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)

我是不是错过了什么?或者有更合适的方法来处理在处理流时引发的运行时异常?

在运行时异常之后,我希望提交该异常事件,然后我希望我StreamThread-1仍然能够使用事件。

推荐答案

我相信您可以将setUncaughtExceptionHandler修改为如下所示

(throwable -> {
 log.error("An exception has occurred={}", e.getMessage()) ;
 return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD; });

本质上,REPLACE_THREAD应该关闭当前线程,而不是终止应用程序/流并继续流。请参阅此帖子以了解更多详细信息 https://stackoverflow.com/a/68058414/1799107

这篇关于春光云流Kafka Stream-如何处理运行时异常?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆