Spring AMQP StatefulRetryOperationsInterceptor未使用 [英] Spring AMQP StatefulRetryOperationsInterceptor not used

查看:311
本文介绍了Spring AMQP StatefulRetryOperationsInterceptor未使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将spring amqp配置为仅在指定的时间内重试消息。目前消息失败,例如因为 DataIntegrityViolationException 无限期重新传递。

I am trying configure spring amqp to only retry a message a defined amount of times. Currently a message that fails e.g. because of a DataIntegrityViolationException is redelivered indefinitely.

根据文档这里我想出了以下配置

According to the documentation here I came up with the following configuration

@Bean
    public StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor() {
        return RetryInterceptorBuilder.stateful()
                .backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
                .maxAttempts(3)
                .messageKeyGenerator(message -> UUID.randomUUID().toString())
                .build();
    } 

这似乎没有应用 - 消息仍然无限期地尝试。

This does not seem to be applied - the messages are still tried indefinitely.

感觉我在这里遗漏了一些东西。

Feels like I am missing something here.

这是关于AMQP的剩余配置:

Here is my remaining configuration regarding AMQP:

@Bean
    Queue testEventSubscriberQueue() {
        final boolean durable = true;
        return new Queue("testEventSubscriberQueue", durable);
    }

    @Bean
    Binding binding(TopicExchange topicExchange) {
        return BindingBuilder.bind(testEventSubscriberQueue()).to(topicExchange).with("payload.event-create");
    }

    @Bean
    SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(testEventSubscriberQueue().getName());
        container.setMessageListener(listenerAdapter);
        container.setChannelTransacted(true);
        return container;
    }


    @Bean
    MessageListenerAdapter listenerAdapter(MessageConverter messageConverter, SubscriberHandler subscriberHandler) {
        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(subscriberHandler);
        listenerAdapter.setMessageConverter(messageConverter);
        return listenerAdapter;
    }

    @Bean
    public MessageConverter messageConverter(ObjectMapper objectMapper) {
        final Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
        jsonMessageConverter.setJsonObjectMapper(objectMapper);
        DefaultClassMapper defaultClassMapper = new DefaultClassMapper();
        defaultClassMapper.setDefaultType(EventPayload.class);
        jsonMessageConverter.setClassMapper(defaultClassMapper);
        final ContentTypeDelegatingMessageConverter messageConverter = new ContentTypeDelegatingMessageConverter(jsonMessageConverter);
        messageConverter.addDelgate(MessageProperties.CONTENT_TYPE_JSON, jsonMessageConverter);
        return messageConverter;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter);
        //rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

@Bean
    public TopicExchange testExchange() {
        final boolean durable = true;
        final boolean autoDelete = false;
        return new TopicExchange(EXCHANGE_NAME, durable, autoDelete);
    }

我使用的是spring-amqp 1.5.1.RELEASE。

I am using spring-amqp 1.5.1.RELEASE.

感谢任何帮助。

推荐答案

您需要配置容器以添加拦截器到它的建议链...

You need to configure the container to add the interceptor to its advice chain...

container.setAdviceChain(new Advice[] { statefulRetryOperationsInterceptor() });

这篇关于Spring AMQP StatefulRetryOperationsInterceptor未使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆