我使用 spring 集成的 tcp 客户端无法获得响应 [英] My tcp client using spring integration not able to get response

查看:42
本文介绍了我使用 spring 集成的 tcp 客户端无法获得响应的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 spring 集成创建了 tcp 客户端,我能够收到我的发送消息的响应.但是当我使用 localDateTime.now() 记录时间时,我无法收到 send message 的响应.我知道这可以通过使用时间设置使线程等待来解决.由于我是 spring 集成的新手,所以请帮助我如何做.

I have created tcp client using spring integration I am able to receive response for my send message . But when I uses localDateTime.now() to log time I am not able to receive the response of send message . I know this can be solved using time setting to make thread wait. As I am new to spring integration So kindly help me how to do it.

@Configuration
@ComponentScan
@EnableAutoConfiguration
public class Test
{

    protected final Log logger = LogFactory.getLog(this.getClass());

    // **************** Client **********************************************
    @Bean
    public MessageChannel replyChannel()
    {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel sendChannel()
    {
        MessageChannel directChannel = new DirectChannel();
        return directChannel;
    }

    @EnableIntegration
    @IntegrationComponentScan
    @Configuration
    public static class config
    {
        @MessagingGateway(defaultRequestChannel = "sendChannel", defaultReplyChannel = "replyChannel")
        public interface Gateway
        {

            String Send(String in);

        }
    }

    @Bean
    AbstractClientConnectionFactory tcpNetClientConnectionFactory()
    {
        AbstractClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory("localhost",
                9999);
        tcpNetClientConnectionFactory.setSerializer(new UCCXImprovedSerializer());
        tcpNetClientConnectionFactory.setDeserializer(new UCCXImprovedSerializer());
        tcpNetClientConnectionFactory.setSingleUse(true);

        tcpNetClientConnectionFactory.setMapper(new TcpMessageMapper());
        return tcpNetClientConnectionFactory;
    }

    @Bean
    @ServiceActivator(inputChannel = "sendChannel")
    TcpOutboundGateway tcpOutboundGateway()
    {
        TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
        tcpOutboundGateway.setConnectionFactory(tcpNetClientConnectionFactory());
        tcpOutboundGateway.setReplyChannel(replyChannel());
        return tcpOutboundGateway;
    }

    public static void main(String args[])
    {
        // new LegaServer();
        ConfigurableApplicationContext applicationContext = SpringApplication.run(Test.class, args);
        String temp = applicationContext.getBean(Gateway.class).Send("kksingh");
        System.out.println(LocalDateTime.now()+"output" + temp);

        applicationContext.stop();

    }
}

我的自定义序列化器和反序列化器 UCCXImprovedSerializerclass根据@Garry

My custom serialzer and deserialser UCCXImprovedSerializerclass after updating as per @Garry

 public class UCCXImprovedSerializer implements Serializer<String>, Deserializer<String>
{
     @Override
    public String deserialize(InputStream initialStream) throws IOException
    {

        System.out.println("deserialzier called");
        StringBuilder sb = new StringBuilder();
        try (BufferedReader rdr = new BufferedReader(new InputStreamReader(initialStream)))
        {
            for (int c; (c = rdr.read()) != -1;)
            {
                sb.append((char) c);

            }
        }
        return sb.toString();
    }

    @Override
    public void serialize(String msg, OutputStream os) throws IOException
    {
        System.out.println(msg + "---serialize---" + Thread.currentThread().getName() + "");
        os.write(msg.getBytes());
    }
   }

我的服务器在端口 9999 代码

My server at port 9999 code

   try
        {
            clientSocket = echoServer.accept();
            System.out.println("client connection established..");
            is = new DataInputStream(clientSocket.getInputStream());
            os = new PrintStream(clientSocket.getOutputStream());
            String tempString = "kksingh";
            byte[] tempStringByte = tempString.getBytes();
            byte[] temp = new byte[tempString.getBytes().length];
            while (true)
            {
                is.read(temp);
                System.out.println(new String(temp) + "--received msg is--- " + LocalDateTime.now());
                System.out.println(LocalDateTime.now() + "sending value");
                os.write(tempStringByte);
                break;
            }
        } catch (IOException e)
        {
            System.out.println(e);
        }
    }

我的 tcp 客户端日志文件

My log file for tcp client

2017-06-04 23:10:14.771  INFO 15568 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.endpoint.EventDrivenConsumer@1f12e153
kksingh---serialize---main
pool-1-thread-1---deserialize----
pool-1-thread-1---deserialize----
pool-1-thread-1---deserialize----
pool-1-thread-1---deserialize----
2017-06-04 23:10:14.812 ERROR 15568 --- [pool-1-thread-1] o.s.i.ip.tcp.TcpOutboundGateway          : Cannot correlate response - no pending reply for localhost:9999:57622:bc98ee29-8957-47bd-bd8a-f734c3ec3f9d
2017-06-04T23:10:14.809output
2017-06-04 23:10:14.821  INFO 15568 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 0

我的服务器端日志文件

client connection established..
kksingh--received msg is--- 2017-06-04T23:10:14.899
2017-06-04T23:10:14.899sending value

当我从服务器和 tcpclient 中删除 localdatetime.now() 时,我能够得到作为 outputkksingh 的响应

when I removed the localdatetime.now() from server and tcpclient I am able to get response as outputkksingh

o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2017-06-05 12:46:32.494  INFO 29076 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2017-06-05 12:46:32.495  INFO 29076 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started _org.springframework.integration.errorLogger
2017-06-05 12:46:32.746  INFO 29076 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http)
2017-06-05 12:46:32.753  INFO 29076 --- [           main] o.s.i.samples.tcpclientserver.Test       : Started Test in 2.422 seconds (JVM running for 2.716)
2017-06-05 12:46:32.761  INFO 29076 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {bridge:null} as a subscriber to the 'replyChannel' channel
2017-06-05 12:46:32.762  INFO 29076 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.replyChannel' has 1 subscriber(s).
2017-06-05 12:46:32.763  INFO 29076 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.endpoint.EventDrivenConsumer@1f12e153
kksingh---serialize---main
pool-1-thread-1---deserialize----kksingh
outputkksingh
2017-06-05 12:46:32.837  INFO 29076 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 0
2017-06-05 12:46:32.839  INFO 29076 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Removing {bridge:null} as a subscriber to the 'replyChannel' channel
2017-06-05 12:46:32.839  INFO 29076 --- [   

推荐答案

您的反序列化器正在反序列化多个数据包...

Your deserializer is deserializing multiple packets...

pool-1-thread-1---deserialize----
pool-1-thread-1---deserialize----
pool-1-thread-1---deserialize----
pool-1-thread-1---deserialize----

产生4条回复消息;网关只能处理一个回复,这就是您看到该 ERROR 消息的原因.

Which produces 4 reply messsages; the gateway can only handle one reply which is why you see that ERROR message.

你的反序列化器需要比仅仅捕获可用"字节更聪明.您需要消息中的某些内容来指示数据的结束(或关闭套接字以指示结束).

You deserializer needs to be smarter than just capturing "available" bytes. You need something in the message to indicate the end of the data (or close the socket to indicate the end).

这篇关于我使用 spring 集成的 tcp 客户端无法获得响应的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆