反序列化异常后继续消费reactor kafka中的后续记录 [英] Continue consuming subsequent records in reactor kafka after deserialization exception

查看:103
本文介绍了反序列化异常后继续消费reactor kafka中的后续记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 reactor kafka 并有一个自定义的 AvroDeserializer 类用于反序列化消息.

I am using reactor kafka and have a custom AvroDeserializer class for deserialization of messages.

现在我有一个案例,对于某些有效负载,反序列化类会抛出异常.我的 Kafka 听众在尝试读取此类记录时立即死亡.我尝试使用 onErrorReturn 并使用 (doOnErroronErrorContinue) 的组合来处理此异常,但是,它有助于记录异常,但未能使用后续记录.

Now I have a case where for certain payloads the deserialization class throws an exception. My Kafka listener dies as soon as it tries to read such records. I tried handling this exception using onErrorReturn and using combination of (doOnError and onErrorContinue), however, it helped log the exception, but failed to consume subsequent records.

public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
   public T deserialize( String topic, byte[] data ) {
       try {
         //logic to deserialize
       }
       catch {
          // throw new SerializationException("error deserializing" );
       }
   }
}

在侦听器端,我试图像这样处理 ->

At the listener end, I'm trying to handle like this ->

@EventListener(ApplicationStartedEvent.class)

public class listener() {
   KakfaReceiver<String, Object> receiver; // kafka listener
   receiver.receive()
   .delayUntil(do something)//logic to update the record to db
   .doOnError(handle error)//trying to handle the exceptions including desrialization exception - logging them to a system
   .onErrorContinue((throwable, o) -> log.info("continuing"))
   .doOnNext(r->r.receiverOffset().acknowledge())
   .subscribe()

}

一种选择是不从 Deserializer 类抛出异常,但我想在单独的系统中记录此类异常以进行分析,因此希望在 kafka 侦听器端处理此类记录.关于如何实现这一目标的任何想法?

One option is not to throw exception from the Deserializer class, but I want to log such exceptions in a separate system for analytics, and hence want handling of such records at the kafka listener end. Any thoughts on how this can be achieved?

推荐答案

看起来像评论中关于使用的建议

Looks like the suggestion in the comment about using

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=logAndContinue

将在大多数情况下工作.但是我想不出让它在 Reactor Spring Kafka 中工作的方法.目前,我继续采用不从反序列化器抛出异常并添加逻辑将其记录在那里本身的方法,这解决了 Kafka 消费者在毒记录之后无法消费后续记录的问题

will work in most cases. But I couldnt figure out of the way to make it work in Reactor Spring Kafka. For now, I went ahead with the approach of not throwing an exception from the deserializer and adding the logic to log it there itself,and that solves the issue of Kafka consumer not being able to consume subsequent records after that on poison record

这篇关于反序列化异常后继续消费reactor kafka中的后续记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆