Spring AMQP RabbitMq 中的计划/延迟消息传递 [英] Scheduled/Delay messaging in Spring AMQP RabbitMq

查看:56
本文介绍了Spring AMQP RabbitMq 中的计划/延迟消息传递的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在努力寻找在 Spring AMQP/Rabbit MQ 中调度/延迟消息的方式.
经过大量搜索后,我仍然无法在 Spring AMQP 中做到这一点.有人可以告诉我如何在 Spring AMQP 中执行 x-delay.
如果消费者端发生某些异常,我想延迟消息.RabbitMQ 说要添加 x-delay 并安装我已经完成的插件,但仍然没有任何延迟地立即发送消息

I am struggling hard to find out the way for scheduled/Delaying messages in Spring AMQP/Rabbit MQ.
After hell lot of searching still I am not able to do that in Spring AMQP. Can someone please tell me how to do x-delay in Spring AMQP.
I want to Delay a message if some exception occurs in the consumer side. RabbitMQ says to add x-delay and install the plugin which I have already done, but still messages is comming immediately without any delay



我收到消息
收到 <(Body:'[B@60a4ae5f(byte[26])'MessageProperties [headers={x-delay=15000}



I am getting this in message
Received <(Body:'[B@60a4ae5f(byte[26])'MessageProperties [headers={x-delay=15000}

 @Bean
ConnectionFactory connectionFactory(){

    CachingConnectionFactory connectionFactory=new CachingConnectionFactory("127.0.0.1");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setPort(1500);
    connectionFactory.setPublisherReturns(true);
    return connectionFactory;

}

@Bean
Binding binding(@Qualifier("queue")Queue queue, DirectExchange exchange) {
    return new Binding(queue.getName(), Binding.DestinationType.QUEUE, exchange.getName(), queue.getName(), null);
    //return BindingBuilder.bind(queue).to(exchange).with(queueName);   
}

@Bean
DirectExchange exchange() {
    DirectExchange exchange=new DirectExchange("delay-exchange");
    return exchange;
}

消费者---
@覆盖

Consumer---
@Override

public void onMessage(Message message, Channel channel) throws Exception {

    System.out.println("Received <" + message+ ">" +rabbitTemplate);

    if(i==1){
        AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
        Map<String,Object> headers = message.getMessageProperties().getHeaders();
        headers.put("x-delay", 15000);
        props.headers(headers);
        i++;
        channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(),
                props.build(), message.getBody());
    }
    }

推荐答案

首先看起来你没有关注 使用 RabbitMQ 调度消息 文章:

First of all looks like you don't follow with the Scheduling Messages with RabbitMQ article:

要使用延迟消息交换,您只需要声明一个提供x-delayed-message"交换类型的交换,如下所示:

To use the Delayed Message Exchange you just need to declare an exchange providing the "x-delayed-message" exchange type as follows:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);

我认为使用 Spring AMQP 也可以实现相同的效果:

I'd say the same can be achieved with the Spring AMQP:

@Bean
CustomExchange delayExchange() {
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-delayed-type", "direct");
    return new CustomExchange("my-exchange", "x-delayed-message", true, false, args);
}

另一个问题是,您确实应该将消息发布到 delay-exchange,而不是其他任何.再说一遍:无论如何,该文档中都提到了这一点.

Another concern that you really should publish messages to that delay-exchange, not any other. Again: that is mentioned in that doc anyway.

更新

从 Spring AMQP 1.6 开始,延迟消息被支持为开箱即用的特性:https://spring.io/blog/2016/02/16/spring-amqp-1-6-0-milestone-1-and-1-5-4-available.

Since Spring AMQP 1.6 the Delayed Messages is supported as out-of-the-box feature: https://spring.io/blog/2016/02/16/spring-amqp-1-6-0-milestone-1-and-1-5-4-available.

这篇关于Spring AMQP RabbitMq 中的计划/延迟消息传递的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆