延迟处理死信队列 [英] Handling dead letter queue with delay
问题描述
我想执行以下操作:当消息失败并落入我的死信队列时,我想等待 5 分钟并在我的队列中重新发布相同的消息.
今天,使用Spring Cloud Streams和RabbitMQ,我做了如下代码
myInput.group.dlq
绑定到 DLX
键为 myInput.group
您应该设置更长的 TTL 并检查 DLQ 中的消息以查看是否有突出显示.
编辑
我刚刚复制了您的代码,延迟了 5 秒,对我来说效果很好(关闭了主交换机的延迟).
重试消息,4 次尝试
和
添加到失败的消息队列
也许您认为它不起作用是因为您的主交易所也有延迟?
I want to do the following: when a message fails and falls to my dead letter queue, I want to wait 5 minutes and republishes the same message on my queue.
Today, using Spring Cloud Streams and RabbitMQ, I did the following code Based on this documentation:
@Component
public class HandlerDlq {
private static final Logger LOGGER = LoggerFactory.getLogger(HandlerDlq.class);
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_DELAY_HEADER = "x-delay";
private static final int NUMBER_OF_RETRIES = 3;
private static final int DELAY_MS = 300000;
private RabbitTemplate rabbitTemplate;
@Autowired
public HandlerDlq(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@RabbitListener(queues = MessageInputProcessor.DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = 0;
}
if (retriesHeader > NUMBER_OF_RETRIES) {
LOGGER.warn("Message {} added to failed messages queue", failedMessage);
this.rabbitTemplate.send(MessageInputProcessor.FAILED, failedMessage);
throw new ImmediateAcknowledgeAmqpException("Message failed after " + NUMBER_OF_RETRIES + " attempts");
}
retriesHeader++;
headers.put(X_RETRIES_HEADER, retriesHeader);
headers.put(X_DELAY_HEADER, DELAY_MS * retriesHeader);
LOGGER.warn("Retrying message, {} attempts", retriesHeader);
this.rabbitTemplate.send(MessageInputProcessor.DELAY_EXCHANGE, MessageInputProcessor.INPUT_DESTINATION, failedMessage);
}
@Bean
public DirectExchange delayExchange() {
DirectExchange exchange = new DirectExchange(MessageInputProcessor.DELAY_EXCHANGE);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Binding bindOriginalToDelay() {
return BindingBuilder.bind(new Queue(MessageInputProcessor.INPUT_DESTINATION)).to(delayExchange()).with(MessageInputProcessor.INPUT_DESTINATION);
}
@Bean
public Queue parkingLot() {
return new Queue(MessageInputProcessor.FAILED);
}
}
My MessageInputProcessor
interface:
public interface MessageInputProcessor {
String INPUT = "myInput";
String INPUT_DESTINATION = "myInput.group";
String DLQ = INPUT_DESTINATION + ".dlq"; //from application.properties file
String FAILED = INPUT + "-failed";
String DELAY_EXCHANGE = INPUT_DESTINATION + "-DlqReRouter";
@Input
SubscribableChannel storageManagerInput();
@Input(MessageInputProcessor.FAILED)
SubscribableChannel storageManagerFailed();
}
And my properties file:
#dlx/dlq setup - retry dead letter 5 minutes later (300000ms later)
spring.cloud.stream.rabbit.bindings.myInput.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.republish-to-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.dlq-ttl=3000
spring.cloud.stream.rabbit.bindings.myInput.consumer.delayedExchange=true
#input
spring.cloud.stream.bindings.myInput.destination=myInput
spring.cloud.stream.bindings.myInput.group=group
With this code, I can read from dead letter queue, capture the header but I can't put it back to my queue (the line LOGGER.warn("Retrying message, {} attempts", retriesHeader);
only runs once, even if I put a very slow time).
My guess is that the method bindOriginalToDelay
is binding the exchange to a new queue, and not mine. However, I didn't find a way to get my queue to bind there instead of creating a new one. But I'm not even sure this is the error.
I've also tried to send to MessageInputProcessor.INPUT
instead of MessageInputProcessor.INPUT_DESTINATION
, but it didn't work as expected.
Also, unfortunately, I can't update Spring framework due to dependencies on the project...
Could you help me with putting back the failed message on my queue after some time? I really didn't want to put a thread.sleep
there...
With that configuration, myInput.group
is bound to the delayed (topic) exchange myInput
with routing key #
.
You should probably remove spring.cloud.stream.rabbit.bindings.myInput.consumer.delayedExchange=true
because you don't need the main exchange to be delayed.
It will also be bound to your explicit delayed exchange, with key myInput.group
.
Everything looks correct to me; you should see the same (single) queue bound to two exchanges:
The myInput.group.dlq
is bound to DLX
with key myInput.group
You should set a longer TTL and examine the message in the DLQ to see if something stands out.
EDIT
I just copied your code with a 5 second delay and it worked fine for me (with turning off the delay on the main exchange).
Retrying message, 4 attempts
and
added to failed messages queue
Perhaps you thought it was not working because you have a delay on the main exchange too?
这篇关于延迟处理死信队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!