使用 onErrorResume 处理使用 Reactor Kafka 发布到 Kafka 的有问题的有效负载 [英] Using onErrorResume to handle problematic payloads posted to Kafka using Reactor Kafka

查看:54
本文介绍了使用 onErrorResume 处理使用 Reactor Kafka 发布到 Kafka 的有问题的有效负载的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 reactor kafka 发送 kafka 消息并接收和处理它们.在接收 kakfa 负载时,我进行了一些反序列化,如果出现异常,我只想记录该负载(通过保存到 mongo ),然后继续接收其他负载.

I am using reactor kafka to send in kafka messages and receive and process them. While receiving the kakfa payload, I do some deserialization, and if there is an exception, I want to just log that payload ( by saving to mongo ), and then continue receiving other payloads.

为此,我使用以下方法 -

For this I am using the below approach -

@EventListener(ApplicationStartedEvent.class)
public void kafkaReceiving() {
   for(Flux<ReceiverRecord<String, Object>> flux: kafkaService.getFluxReceives()) {
       flux.delayUntil(//some function to do something)
       .doOnNext(r -> r.receiverOffset().acknowledge())
       .onErrorResume(this::handleException()) // here I'll just save to mongo 
       .subscribe();
   }
}


private Publisher<? extends ReceiverRecord<String,Object>> handleException(object ex) {
 // save to mongo
 return Flux.empty();
}

在这里,我希望每当我在接收有效负载时遇到异常时,onErrorResume 应该捕获它并登录到 mongo,然后当我发送到 kafka 队列时,我应该可以继续接收更多消息.但是,我看到在异常之后,即使调用了 onErrorResume 方法,但我无法再处理发送到 Kakfa 主题的消息.我可能在这里遗漏了什么?

Here I expect that whenever I encounter an exception while receiving a payload, the onErrorResume should catch it and log to mongo and then I should be good to continue receiving more messages when I send to the kafka queue. However, I see that after the exception, even though the onErrorResume method gets invoked, but I am not able to process anymore messages sent to Kakfa topic. Anything I might be missing here?

推荐答案

正如@bsideup 也提到的,我最终没有从反序列化器中抛出异常,因为 kafka 无法为该记录提交偏移量,并且由于我们没有记录的偏移信息(因为它的格式错误),因此没有干净的方法可以忽略该记录并继续使用记录.因此,即使我尝试使用反应式错误操作符忽略记录,轮询也会获取相同的记录,然后消费者就会陷入困境

As mentioned by @bsideup too, I ultimately went ahead with not throwing exception from the deserializer, since the kafka is not able to commit offset for that record, and there is no clean way of ignoring that record and going ahead with further consumption of records as we dont have the offset information of the record( since it is malformed). So even if I try to ignore the record using reactive error operators, the poll fetches the same record, and the consumer is then kind of stuck

这篇关于使用 onErrorResume 处理使用 Reactor Kafka 发布到 Kafka 的有问题的有效负载的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆