Spring AMQP StatefulRetryOperationsInterceptor未使用 [英] Spring AMQP StatefulRetryOperationsInterceptor not used
问题描述
我正在尝试将spring amqp配置为仅在指定的时间内重试消息。目前消息失败,例如因为 DataIntegrityViolationException
无限期重新传递。
I am trying configure spring amqp to only retry a message a defined amount of times. Currently a message that fails e.g. because of a DataIntegrityViolationException
is redelivered indefinitely.
根据文档这里我想出了以下配置
According to the documentation here I came up with the following configuration
@Bean
public StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor() {
return RetryInterceptorBuilder.stateful()
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.maxAttempts(3)
.messageKeyGenerator(message -> UUID.randomUUID().toString())
.build();
}
这似乎没有应用 - 消息仍然无限期地尝试。
This does not seem to be applied - the messages are still tried indefinitely.
感觉我在这里遗漏了一些东西。
Feels like I am missing something here.
这是关于AMQP的剩余配置:
Here is my remaining configuration regarding AMQP:
@Bean
Queue testEventSubscriberQueue() {
final boolean durable = true;
return new Queue("testEventSubscriberQueue", durable);
}
@Bean
Binding binding(TopicExchange topicExchange) {
return BindingBuilder.bind(testEventSubscriberQueue()).to(topicExchange).with("payload.event-create");
}
@Bean
SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(testEventSubscriberQueue().getName());
container.setMessageListener(listenerAdapter);
container.setChannelTransacted(true);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(MessageConverter messageConverter, SubscriberHandler subscriberHandler) {
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(subscriberHandler);
listenerAdapter.setMessageConverter(messageConverter);
return listenerAdapter;
}
@Bean
public MessageConverter messageConverter(ObjectMapper objectMapper) {
final Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
jsonMessageConverter.setJsonObjectMapper(objectMapper);
DefaultClassMapper defaultClassMapper = new DefaultClassMapper();
defaultClassMapper.setDefaultType(EventPayload.class);
jsonMessageConverter.setClassMapper(defaultClassMapper);
final ContentTypeDelegatingMessageConverter messageConverter = new ContentTypeDelegatingMessageConverter(jsonMessageConverter);
messageConverter.addDelgate(MessageProperties.CONTENT_TYPE_JSON, jsonMessageConverter);
return messageConverter;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
//rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
public TopicExchange testExchange() {
final boolean durable = true;
final boolean autoDelete = false;
return new TopicExchange(EXCHANGE_NAME, durable, autoDelete);
}
我使用的是spring-amqp 1.5.1.RELEASE。
I am using spring-amqp 1.5.1.RELEASE.
感谢任何帮助。
推荐答案
您需要配置容器以添加拦截器到它的建议链...
You need to configure the container to add the interceptor to its advice chain...
container.setAdviceChain(new Advice[] { statefulRetryOperationsInterceptor() });
这篇关于Spring AMQP StatefulRetryOperationsInterceptor未使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!