Spring Integration 通过 errorChannel throw 重新传递与 JmsTransactionManager 不支持最大重新传递 [英] Spring Integration redelivery via errorChannel throw with JmsTransactionManager doesnt honor maximumRedeliveries

查看:84
本文介绍了Spring Integration 通过 errorChannel throw 重新传递与 JmsTransactionManager 不支持最大重新传递的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

与 SO 问题相关:Spring Integration Java DSL using JMS重试/重发

对 connectionFactory 使用事务处理轮询器和 JmsTransactionManager 并将 maximumRedeliveries 设置为 3 会导致实际重发尝试次数增加一倍.

Using a transacted poller and JmsTransactionManager on a connectionFactory with maximumRedeliveries set to 3 results in a doubling of the actual redlievery attempts.

我怎样才能让它遵守连接工厂的重新传递设置?

How can I get this to honor the redelivery settings of the connection factory?

我的 connectionFactory 构建为:

My connectionFactory is built as:

 @Bean (name="spring-int-connection-factory")
    ActiveMQConnectionFactory jmsConnectionFactory(){
        return buildConnectionFactory(
                brokerUrl,
                DELAY_2_SECS,
                MAX_REDELIVERIES,
                "spring-int");
    }

 public static ActiveMQConnectionFactory buildConnectionFactory(String brokerUrl, Long retryDelay, Integer maxRedeliveries, String clientIdPrefix){
        ActiveMQConnectionFactory amqcf = new ActiveMQConnectionFactory();
        amqcf.setBrokerURL(brokerUrl);
        amqcf.setClientIDPrefix(clientIdPrefix);
        if (maxRedeliveries != null) {
            if (retryDelay == null) {
                retryDelay = 500L;
            }
            RedeliveryPolicy rp = new org.apache.activemq.RedeliveryPolicy();
            rp.setInitialRedeliveryDelay(retryDelay);
            rp.setRedeliveryDelay(retryDelay);
            rp.setMaximumRedeliveries(maxRedeliveries);
        }
        return amqcf;
    }

我使用轮询器的流程如下:

My flow with poller is as:

@Bean
    public IntegrationFlow flow2(@Qualifier("spring-int-connection-factory") ConnectionFactory connectionFactory) {

        IntegrationFlow flow =  IntegrationFlows.from(
                Jms.inboundAdapter(connectionFactory)
                        .configureJmsTemplate(t -> t.receiveTimeout(1000).sessionTransacted(true))
                        .destination(INPUT_DIRECT_QUEUE),
                e -> e.poller(Pollers
                        .fixedDelay(5000)
                        .transactional()
                        .errorChannel("customErrorChannel")
                        .maxMessagesPerPoll(2))
        ).handle(this.msgHandler).get();

        return flow;
    }

我的 errorChannel 处理程序只是重新抛出导致 JMS 重新传递发生.

My errorChannel handler simply re-throws which causes JMS redelivery to happen.

当我在将处理程序设置为始终抛出异常的情况下运行此程序时,我看到消息处理程序实际上收到了 7 次消息(1 次初始和 6 次重新传递).

When I run this with the handler set to always throw an exception, I see that the message handler actually receives the message 7 times (1 initial and 6 redeliveries).

根据我的 connectionFactory 配置,我预计只有 3 次重新交付.

I expected only 3 redeliveries according to my connectionFactory config.

任何想法是什么导致尝试次数加倍以及如何减轻它?

Any ideas what is causing the doubling of attempts and how to mitigate it?

推荐答案

这对我来说很好用 - 在 4 点停止...

This works fine for me - stops at 4...

@SpringBootApplication
public class So51792909Application {

    private static final Logger logger = LoggerFactory.getLogger(So51792909Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So51792909Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template) {
        return args -> {
            for (int i = 0; i < 1; i++) {
                template.convertAndSend("foo", "test");
            }
        };
    }

    @Bean
    public IntegrationFlow flow(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Jms.inboundAdapter(connectionFactory)
                        .destination("foo"), e -> e
                            .poller(Pollers
                                    .fixedDelay(5000)
                                    .transactional()
                                    .maxMessagesPerPoll(2)))
                .handle((p, h) -> {
                    System.out.println(h.get("JMSXDeliveryCount"));
                    try {
                        Thread.sleep(2000);
                    }
                    catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                    }
                    throw new RuntimeException("foo");
                })
                .get();
    }

    @Bean
    public JmsTransactionManager transactionManager(ConnectionFactory cf) {
        return new JmsTransactionManager(cf);
    }

    @Bean
    public ActiveMQConnectionFactory amqCF() {
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
        RedeliveryPolicy rp = new RedeliveryPolicy();
        rp.setMaximumRedeliveries(3);
        cf.setRedeliveryPolicy(rp);
        return cf;
    }

    public CachingConnectionFactory connectionFactory() {
        return new CachingConnectionFactory(amqCF());
    }

    @JmsListener(destination = "ActiveMQ.DLQ")
    public void listen(String in) {
        logger.info(in);
    }

}

这篇关于Spring Integration 通过 errorChannel throw 重新传递与 JmsTransactionManager 不支持最大重新传递的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆