reactor-kafka相关内容

如何重试反应堆中出现故障的耗材记录-卡夫卡

我正在尝试Reactive-Kafka消费消息。其他一切都很好,但我想为失败的消息添加一个重试(2)。默认情况下,Spring-Kafka已经重试失败记录3次,我想使用Reactive-Kafka实现相同的记录。 我正在使用Spring-Kafka作为Active-Kafka的包装器。以下是我的消费者模板: reactiveKafkaConsumerTemplate ..
发布时间:2022-05-06 15:27:38 其他开发

项目反应器采用Spring集成时的主要工艺流程程序化方法

我想定义一个流,使用反应器Kafka消费Kafka并写入MongoDB,只有在成功时才将ID写入Kafka。我正在将项目反应器与Spring Integration JavaDSL结合使用,并且我希望有一个FlowBuilder类在较高级别定义我的管道。我目前有以下方向: public IntegrationFlow buildFlow() { return IntegrationFl ..

使用 onErrorResume 处理使用 Reactor Kafka 发布到 Kafka 的有问题的有效负载

我正在使用 reactor kafka 发送 kafka 消息并接收和处理它们.在接收 kakfa 负载时,我进行了一些反序列化,如果出现异常,我只想记录该负载(通过保存到 mongo ),然后继续接收其他负载. 为此,我使用以下方法 - @EventListener(ApplicationStartedEvent.class)公共无效卡夫卡接收(){for(Flux>flux: kafk ..

反序列化异常后继续消费reactor kafka中的后续记录

我正在使用 reactor kafka 并有一个自定义的 AvroDeserializer 类用于反序列化消息. 现在我有一个案例,对于某些有效负载,反序列化类会抛出异常.我的 Kafka 听众在尝试读取此类记录时立即死亡.我尝试使用 onErrorReturn 并使用 (doOnError 和 onErrorContinue) 的组合来处理此异常,但是,它有助于记录异常,但未能使用后续记录 ..
发布时间:2021-06-22 18:36:34 Java开发

使用onErrorResume处理使用Reactor Kafka发布到Kafka的有问题的有效负载

我正在使用反应堆卡夫卡发送卡夫卡消息并接收和处理它们.在接收kakfa有效负载时,我会进行反序列化,如果有异常,我想只记录该有效负载(通过保存到mongo),然后继续接收其他有效负载. 为此,我正在使用以下方法- @EventListener(ApplicationStartedEvent.class)公共无效kafkaReceiving(){for(Flux ..