虚假错误“无法关联响应-没有待处理的回复";使用TcpOutboundGateway和CachingClientConnectionFactory [英] Spurious error "Cannot correlate response - no pending reply" using TcpOutboundGateway and CachingClientConnectionFactory

查看:93
本文介绍了虚假错误“无法关联响应-没有待处理的回复";使用TcpOutboundGateway和CachingClientConnectionFactory的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在多线程上下文中使用TcpOutboundGateway和CachingClientConnectionFactory时,我得到了虚假的相关错误.

I am getting spurious correlation errors using TcpOutboundGateway with CachingClientConnectionFactory in a multithreaded context.

日志消息是: 2015-05-26 14:50:38.406错误3320 --- [pool-2-thread-2] o.s.i.ip.tcp.TcpOutboundGateway:无法关联响应-没有待处理的回复

The log message is: 2015-05-26 14:50:38.406 ERROR 3320 --- [pool-2-thread-2] o.s.i.ip.tcp.TcpOutboundGateway : Cannot correlate response - no pending reply

从单个线程发送时没有出现错误,并且已经测试了2台物理计算机-Windows 7和Fedora20.我正在使用Spring boot

I do not get the error when sending from a single thread, and I have tested and 2 physical machines - Windows 7 and Fedora 20. I am using Spring boot

这会导致未接收到响应的发送超时错误.

It results in a timeout error for on the send that does not recieve its response.

下面是我的简化代码: 请注意,它并不总是会产生错误-这是虚假的 该代码使用TcpOutboundGateway和TcpInboundGateway,但是在我的实际应用程序中,服务器是旧版(不是Spring)Java代码,因此我使用CachingClientConnectionFactory来提高性能

Below is my simplified code: Note it does not always produce the error - it is spurious The code Uses a TcpOutboundGateway and TcpInboundGateway, but in my actual application the server is legacy (not Spring) Java code, so I use CachingClientConnectionFactory to enhance performance

@Configuration
@ComponentScan
@EnableAutoConfiguration
public class Test {

    //**************** Client **********************************************
    @Bean
    public MessageChannel replyChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel sendChannel() {
        MessageChannel directChannel = new DirectChannel();
        return directChannel;
    }

    @Bean
    AbstractClientConnectionFactory tcpNetClientConnectionFactory() {
        AbstractClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory("localhost", 9003);
        CachingClientConnectionFactory cachingClientConnectionFactory = new CachingClientConnectionFactory(tcpNetClientConnectionFactory, 4);
        return cachingClientConnectionFactory;
    }

    @Bean
    @ServiceActivator(inputChannel = "sendChannel")
    TcpOutboundGateway tcpOutboundGateway() {
        TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
        tcpOutboundGateway.setConnectionFactory(tcpNetClientConnectionFactory());
        tcpOutboundGateway.setReplyChannel(replyChannel());

        return tcpOutboundGateway;
    }
    //******************************************************************


    //**************** Server **********************************************
    @Bean
    public MessageChannel receiveChannel() {
        return new DirectChannel();
    }

    @Bean
    TcpNetServerConnectionFactory tcpNetServerConnectionFactory() {
        TcpNetServerConnectionFactory tcpNetServerConnectionFactory =  new TcpNetServerConnectionFactory(9003);
        tcpNetServerConnectionFactory.setSingleUse(false);
        return tcpNetServerConnectionFactory;
    }

    @Bean
    TcpInboundGateway tcpInboundGateway() {
        TcpInboundGateway tcpInboundGateway = new TcpInboundGateway();
        tcpInboundGateway.setConnectionFactory(tcpNetServerConnectionFactory());
        tcpInboundGateway.setRequestChannel(receiveChannel());
        return tcpInboundGateway;
    }
    //******************************************************************

    @Bean
    @Scope("prototype")
    Worker worker() {
        return new Worker();
    }

    public volatile static int lc = 4;
    public volatile static int counter = lc;
    public volatile static long totStartTime = 0;
    public volatile static int messageCount = 0;

    public static synchronized int incMessageCount(){
        return ++messageCount;
    }


    public static void main(String args[]) {
        //new LegaServer();
        ConfigurableApplicationContext applicationContext = SpringApplication.run(Test.class, args);
        totStartTime = System.currentTimeMillis();

        for (int z = 0; z < lc; z++) {
            new Thread((Worker) applicationContext.getBean("worker")).start();
        }

        try {
            Thread.currentThread().sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        applicationContext.stop();

    }
}


@MessageEndpoint
class RequestHandler {

    @ServiceActivator(inputChannel = "receiveChannel")
    public String rxHandler(byte [] in) {
        String s = new String(in);
        System.out.println("rxHandler:"+s);
        return "Blah blah " + s;
    }

}

@MessageEndpoint
class ResponseHandler {

    @ServiceActivator(inputChannel = "replyChannel")
    public void replyHandler(byte [] in) {
        System.out.println("replyHandler:"+new String(in));
    }

}

class Worker implements Runnable {

    @Autowired
    @Qualifier("sendChannel")
    MessageChannel dc;

    @Override
    public void run() {
        Test.counter--;
        int locMessageCount=0;
        long startTime = System.currentTimeMillis();
        for (int t = 0; t < 20; t++) {

            locMessageCount = Test.incMessageCount();

            Map hs = new HashMap<String, String>();
            hs.put("context", new Integer(Test.counter));

            GenericMessage message = new GenericMessage("this is a test message " + locMessageCount, hs);

            try {
                boolean sent = dc.send(message);
            } catch (Exception e) {
                //e.printStackTrace();
                System.out.println("locMessageCount:"+locMessageCount);
            }

        }

        if (locMessageCount == (Test.lc*20)) {
            long totfinTime = System.currentTimeMillis();
            System.out.println("Tot. Time taken: " + (totfinTime - Test.totStartTime));
            System.out.println("Tot. TPS: " + (1000 * 20* Test.lc) / (totfinTime - Test.totStartTime));
            System.out.println("Tot. messages: " + Test.messageCount);
        }

    }
}

任何建议都将不胜感激,因为到目前为止我已经获得了帮助. TY

Any suggestions would be greatly appreciated, as is the assistance I have received so far. TY

推荐答案

谢谢;这是出站网关和缓存连接工厂组合的错误;请打开 JIRA问题.

Thanks; this is a bug with the combo of the outbound gateway and caching connection factory; please open a JIRA Issue.

问题在于,在第一个线程(线程5)删除挂起的答复之前,将连接添加回池中(并重用了).他最终删除了针对线程2的新待处理答复,而不是自己的答复.

The problem is that the connection is added back to the pool (and reused) before the first thread (Thread-5) removes the pending reply; he ends up removing the new pending reply (for Thread-2) instead of his own.

不幸的是,我没有适合您的简单解决方法;它需要更改网关中的代码以对其进行修复.

Unfortunately, I don't have a simple work-around for you; it needs code changes in the gateway to fix it.

这篇关于虚假错误“无法关联响应-没有待处理的回复";使用TcpOutboundGateway和CachingClientConnectionFactory的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆