在队列尾部重新排队 Amqp 消息 [英] Re-queue Amqp message at tail of Queue

查看:34
本文介绍了在队列尾部重新排队 Amqp 消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用 Spring 和 RabbitMQ 的项目设置.目前,我的应用程序可能会收到一条 amqp 消息,该消息在另一个异步进程完成之前无法处理(遗留和完全分离,我无法控制).所以结果是我可能不得不等待处理一条消息一段时间.这样做的结果是变压器中的异常.

I have a project setup using Spring and RabbitMQ. Currently it is possible for my application to receive an amqp message that cannot be processed until another asynchronous process has completed (legacy and totally detached, i have no control). So the result is i may have to wait on processing a message for some amount of time. The result of this is an exception in a transformer.

当消息被 NACK 返回给 rabbitMQ 时,它会将它放回队列的头部并立即重新拉出它.如果我收到等于并发侦听器数量的无法处理的消息,我的工作流程就会锁定.它转动轮子等待消息变为可处理状态,即使队列中还有有效的可处理消息等待.

When the message is NACK'd back to rabbitMQ it is putting it back into the head of the queue and re-pulling it immediately. If i get unprocessable messages equal to the number of concurrent listeners my workflow locks up. It spins its wheels waiting for messages to become processable, even though there are valid processable messages waiting behind in the queue.

有没有办法拒绝和 amqp 消息并让它回到队列的尾部?根据我的研究,rabbitMQ 曾经以这种方式工作,但现在我似乎只获得了队列的头部.

Is there a way to reject and amqp message and have it go back to the tail of the queue instead? From my research rabbitMQ worked this way at one time, but now i appear to get the head of the queue exclusively.

我的配置相当简单,但为了保持连续性,它是...

My config is rather straight forward, but for continuity here it is...

连接工厂是:org.springframework.amqp.rabbit.connection.CachingConnectionFactoryRabbitMQ 3.1.1

Connection factory is: org.springframework.amqp.rabbit.connection.CachingConnectionFactory RabbitMQ 3.1.1

Spring 集成:2.2.0

Spring Integration: 2.2.0

<si:channel id="channel"/>
<si-amqp:inbound-channel-adapter
    queue-names="commit" channel="channel" connection-factory="amqpConnectionFactory"
    acknowledge-mode="AUTO" concurrent-consumers="${listeners}"
    channel-transacted="true"
    transaction-manager="transactionManager"/>

<si:chain input-channel="channel" output-channel="nullChannel">
    <si:transformer ref="transformer"></si:transformer>
    <si:service-activator ref="activator"/>
</si:chain>

推荐答案

您说得对,RabbitMQ 不久前已更改.API 中没有任何内容可以改变行为.

You are correct that RabbitMQ was changed some time ago. There is nothing in the API to change the behavior.

当然,您可以在入站适配器上放置一个 error-channel,然后是一个转换器 (expression="payload.failedMessage"),然后是一个出站适配器配置了适当的交换/路由密钥,以将消息重新排入队列后面.

You can, of course, put an error-channel on the inbound adapter, followed by a transformer (expression="payload.failedMessage"), followed by an outbound adapter configured with an appropriate exchange/routing-key to requeue the message at the back of the queue.

您可能希望在错误流中添加一些额外的逻辑来检查异常类型 (payload.cause) 并决定您想要的操作.

You might want to add some additional logic in the error flow to check the exception type (payload.cause) and decide which action you want.

如果错误流本身抛出异常,原消息会像以前一样在头部重新排队;如果它正常退出,消息将被确认.

If the error flow itself throws an exception, the original message will be requeued at the head, as before; if it exits normally, the message will be acked.

这篇关于在队列尾部重新排队 Amqp 消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆