Spring Reactive Stream - 意外关闭 [英] Spring Reactive Stream - Unexpected Shutdown

查看:39
本文介绍了Spring Reactive Stream - 意外关闭的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在将 Spring Cloud Reactive Streams 与 RabbitMQ 结合使用.

We are using Spring Cloud Reactive Streams with RabbitMQ.

Spring Reactive Stream 似乎在将消息从队列中拉出时立即确认该消息.因此,在消息处理期间发生的任何错误未处理异常都需要在应用程序中进行处理(这与非反应流不同,非反应流可以抛出未处理的异常并拒绝消息,从而将其发送到死信队列).

Spring Reactive Stream appears to acknowledge the message as soon as it pulls it off the queue. So any errors unhandled exceptions that happens during the message processing need to be handled in the application (which is a different than a non-reactive stream where unhandled exceptions can be thrown and a message would be rejected, thus sending it to a dead letter queue).

当消息正在传输时,我们应该如何处理应用程序中的突然关闭?

How are we supposed to deal with a sudden shutdown in an application when a message is in flight?

例如:

  • 应用程序从队列中提取消息
  • 应用程序将消息标记为已确认
  • 应用程序开始处理消息
  • 应用在消息处理完成前关闭

发生这种情况时,消息似乎完全丢失,因为它不在队列中,但应用程序已停止.我们如何恢复这些消息?

When this happens, the message appears to be lost completely, since it is off of the queue but the application is stopped. How can we recover these messages?

推荐答案

您需要使用手动确认并推迟确认,直到处理异步完成.为此,您需要使用整个消息:

You need to use manual acknowledgments and defer the acks until processing is asynchronously completed. To do that, you need to consume the whole message:

    @Bean
    public Consumer<Flux<Message<String>>> async() {
        return inbound -> inbound

                ...

                .map(msg -> {
                    try {
                        msg.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
                                .basicAck(msg.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false);
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                    return msg.getPayload();
                })
                .subscribe(System.out::println);
    }

spring:
  cloud:
    stream:
      function.definition: async
      bindings:
        async-in-0:
          destination: testtock
          group: async
      rabbit:
        bindings:
          async-in-0:
            consumer:
              acknowledge-mode: MANUAL
              prefetch: 10

使用 basicReject 重新排队或发送到 DLQ.

Use basicReject to requeue or send to the DLQ.

这篇关于Spring Reactive Stream - 意外关闭的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆