使用onErrorResume处理使用Reactor Kafka发布到Kafka的有问题的有效负载 [英] Using onErrorResume to handle problematic payloads posted to Kafka using Reactor Kafka

查看:53
本文介绍了使用onErrorResume处理使用Reactor Kafka发布到Kafka的有问题的有效负载的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用反应堆卡夫卡发送卡夫卡消息并接收和处理它们.在接收kakfa有效负载时,我会进行反序列化,如果有异常,我想只记录该有效负载(通过保存到mongo),然后继续接收其他有效负载.

I am using reactor kafka to send in kafka messages and receive and process them. While receiving the kakfa payload, I do some deserialization, and if there is an exception, I want to just log that payload ( by saving to mongo ), and then continue receiving other payloads.

为此,我正在使用以下方法-

For this I am using the below approach -

@EventListener(ApplicationStartedEvent.class)
public void kafkaReceiving() {
   for(Flux<ReceiverRecord<String, Object>> flux: kafkaService.getFluxReceives()) {
       flux.delayUntil(//some function to do something)
       .doOnNext(r -> r.receiverOffset().acknowledge())
       .onErrorResume(this::handleException()) // here I'll just save to mongo 
       .subscribe();
   }
}


private Publisher<? extends ReceiverRecord<String,Object>> handleException(object ex) {
 // save to mongo
 return Flux.empty();
}

在这里,我希望每当接收到有效负载时遇到异常时,onErrorResume都应该捕获该异常并登录到mongo,然后当我发送到kafka队列时,我应该继续接收更多消息.但是,我看到在异常之后,即使onErrorResume方法被调用,但我也无法处理发送给Kakfa主题的消息.我这里可能会缺少什么?

Here I expect that whenever I encounter an exception while receiving a payload, the onErrorResume should catch it and log to mongo and then I should be good to continue receiving more messages when I send to the kafka queue. However, I see that after the exception, even though the onErrorResume method gets invoked, but I am not able to process anymore messages sent to Kakfa topic. Anything I might be missing here?

推荐答案

正如@bsideup所提到的那样,由于kafka无法为该记录提交偏移量,因此我最终也没有从反序列化器中引发异常.没有干净的方法来忽略该记录并继续消耗记录,因为我们没有记录的偏移量信息(因为其格式错误).因此,即使我尝试使用反应式错误运算符忽略记录,民意调查也会获取相同的记录,从而使使用者有点卡住

As mentioned by @bsideup too, I ultimately went ahead with not throwing exception from the deserializer, since the kafka is not able to commit offset for that record, and there is no clean way of ignoring that record and going ahead with further consumption of records as we dont have the offset information of the record( since it is malformed). So even if I try to ignore the record using reactive error operators, the poll fetches the same record, and the consumer is then kind of stuck

这篇关于使用onErrorResume处理使用Reactor Kafka发布到Kafka的有问题的有效负载的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆