处理 Kafka 流中的异常 [英] Handling exceptions in Kafka streams

查看:69
本文介绍了处理 Kafka 流中的异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

已经浏览了多个帖子,但其中大部分都是相关的处理坏消息,而不是处理它们时的异常处理.

Had gone through multiple posts but most of them are related handling Bad messages not about exception handling while processing them.

我想知道如何处理流应用程序接收到的消息并且在处理消息时出现异常?异常可能是由于多种原因造成的,例如网络故障、运行时异常等,

I want to know to how to handle the messages that is been received by the stream application and there is an exception while processing the message? The exception could be because of multiple reasons like Network failure, RuntimeException etc.,

  • 有人可以建议什么是正确的做法吗?我应该使用setUncaughtExceptionHandler?或者有更好的方法吗?
  • 如何处理重试?
  • Could someone suggest what is the right way to do? Should I use setUncaughtExceptionHandler? or is there a better way?
  • How to handle retries?

推荐答案

这取决于您想对生产者端的异常做什么.如果生产者会抛出异常(例如,由于网络故障或 kafka 代理已死亡),则默认情况下流将死亡.使用 kafka-streams 1.1.0 版,您可以通过实现 ProductionExceptionHandler 来覆盖默认行为,如下所示:

it depends what do you want to do with exceptions on producer side. if exception will be thrown on producer (e.g. due to Network failure or kafka broker has died), stream will die by default. and with kafka-streams version 1.1.0 you could override default behavior by implementing ProductionExceptionHandler like the following:

public class CustomProductionExceptionHandler implements ProductionExceptionHandler {

    @Override
    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        log.error("Kafka message marked as processed although it failed. Message: [{}], destination topic: [{}]",  new String(record.value()), record.topic(), exception);
        return ProductionExceptionHandlerResponse.CONTINUE;
    }

    @Override
    public void configure(final Map<String, ?> configs) {
    }

}

从句柄方法你可以返回 CONTINUE 如果你不希望流因异常而死亡,返回 FAIL 以防你想要流停止(FAIL 是默认值)).并且您需要在流配置中指定此类:

from handle method you could return either CONTINUE if you don't want streams dying on exception, on return FAIL in case you want stream stops (FAIL is default one). and you need specify this class in stream config:

default.production.exception.handler=com.example.CustomProductionExceptionHandler

还要注意ProductionExceptionHandler只处理producer上的异常,不会处理流方法mapValues(..)filter处理消息时的异常(..), branch(..) 等,你需要用 try/catch 块包装这些方法逻辑(把你所有的方法逻辑放到 try 块中,以保证你会处理所有例外情况):

Also pay attention that ProductionExceptionHandler handles only exceptions on producer, and it will not handle exceptions during processing message with stream methods mapValues(..), filter(..), branch(..) etc, you need to wrap these method logic with try / catch blocks (put all your method logic into try block to guarantee that you will handle all exceptional cases):

.filter((key, value) -> { try {..} catch (Exception e) {..} })

据我所知,我们不需要在消费者端显式处理异常,因为 kafka 流将在稍后自动重试消费(因为在消息被消费和处理之前不会更改偏移量);例如如果一段时间内无法访问 kafka 代理,您将收到来自 kafka 流的异常,并且当中断时,kafka 流将消耗所有消息.所以在这种情况下,我们只会延迟,不会有任何损坏/丢失.

as I know, we don't need to handle exceptions on consumer side explicitly, as kafka streams will retry automatically consuming later (as offset will not be changed until messages will be consumed and processed); e.g. if kafka broker will be not reachable for some time, you will got exceptions from kafka streams, and when broken will be up, kafka stream will consume all messages. so in this case we will have just delay and nothing corrupted/lost.

使用 setUncaughtExceptionHandler 您将无法像使用 ProductionExceptionHandler 那样更改默认行为,使用它您只能记录错误或将消息发送到失败主题.

with setUncaughtExceptionHandler you will not be able to change default behavior like with ProductionExceptionHandler, with it you could only log error or send message into failure topic.

更新自 kafka-streams 2.8.0

kafka-streams 2.8.0开始,你有能力自动替换失败的流线程(由未捕获的异常引起)使用 KafkaStreams 方法 void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);StreamThreadExceptionResponse.REPLACE_THREAD.有关更多详细信息,请查看 Kafka Streams 特定的未捕获异常处理程序

since kafka-streams 2.8.0, you have the ability to automatically replace failed stream thread (that caused by uncaught exception) using KafkaStreams method void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh); with StreamThreadExceptionResponse.REPLACE_THREAD. For more details please take a look at Kafka Streams Specific Uncaught Exception Handler

kafkaStreams.setUncaughtExceptionHandler(ex -> {
    log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});

这篇关于处理 Kafka 流中的异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆