如何使用 spring 集成在 TCP 连接上实现保持活动连接? [英] How to implement a keep alive connection on a TCP connection using spring integration?

查看:55
本文介绍了如何使用 spring 集成在 TCP 连接上实现保持活动连接?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用 spring 集成 TCP 构建的 TCP 客户端,并且服务器支持保持活动消息(ping/pong 样式).连接是使用 CachingClientConnectionFactory 配置的,我想利用此服务器功能.这是我的 bean 配置:

private static final int SERIALIZER_HEADER_SIZE = 2;/*** 连接工厂用于发送和接收消息的序列化程序*/@豆角,扁豆公共 ByteArrayLengthHeaderSerializer byteArrayLengthHeaderSerializer() {返回新的 ByteArrayLengthHeaderSerializer(SERIALIZER_HEADER_SIZE);}@豆角,扁豆公共 AbstractClientConnectionFactory tcpClientConnectionFactory() {TcpNetClientConnectionFactory connFactory =新的 TcpNetClientConnectionFactory(props.getUrl(), props.getPort());connFactory.setSerializer(byteArrayLengthHeaderSerializer());connFactory.setDeserializer(byteArrayLengthHeaderSerializer());connFactory.setSoTimeout(props.getSoTimeout());如果(道具.isUseSSL()){connFactory.setTcpSocketFactorySupport(new DefaultTcpNetSSLSocketFactorySupport(() -> {返回 SSLContext.getDefault();}));}返回 connFactory;}/*** 用于创建 TCP 客户端套接字连接的连接工厂*/@豆角,扁豆公共 AbstractClientConnectionFactory tcpCachedClientConnectionFactory() {CachingClientConnectionFactory cachingConnFactory =new CachingClientConnectionFactory(tcpClientConnectionFactory(), props.getMaxPoolSize());cachingConnFactory.setConnectionWaitTimeout(props.getMaxPoolWait());返回缓存ConnFactory;}

使用此处发布的解决方案 配置保持活动状态始终保持连接处于活动状态 我可以保持连接处于打开状态,但我也想利用这些服务器保持活动状态的消息并不时发送这些消息以检查连接是否仍然有效.这可以提高客户端的性能,因为如果套接字关闭,则不需要重新连接/创建新连接.

基于此,有没有人对如何使用 spring 集成实现这一点有建议?

解决方案

当使用简单的客户端连接工厂时,使用 @InboundChannelAdapter 设置应用程序级心跳消息非常容易.

简单例子:

@SpringBootApplication公共类 So46918267Application {public static void main(String[] args) 抛出 IOException {//模拟服务器最终 ServerSocket 服务器 = ServerSocketFactory.getDefault().createServerSocket(1234);ExecutorService executor = Executors.newSingleThreadExecutor();executor.execute(() -> {尝试 {套接字 socket = server.accept();BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));字符串线;while ((line = reader.readLine()) != null) {System.out.println(line);if (line.equals("keep_alive")) {socket.getOutputStream().write("OK\r\n".getBytes());}}}捕获(IOException e){e.printStackTrace();}});ConfigurableApplicationContext context = SpringApplication.run(So46918267Application.class, args);System.out.println("按回车键终止");System.in.read();executor.shutdownNow();上下文.close();server.close();}@豆角,扁豆公共 TcpNetClientConnectionFactory 客户端(){返回新的 TcpNetClientConnectionFactory("localhost", 1234);}@ServiceActivator(inputChannel = "toTcp")@豆角,扁豆公共 TcpOutboundGateway 网关(){TcpOutboundGateway gateway = new TcpOutboundGateway();gateway.setConnectionFactory(client());返回网关;}//心跳私人最终消息<?>heartbeatMessage = MessageBuilder.withPayload("keep_alive").setReplyChannelName("heartbeatReplies").建造();@InboundChannelAdapter(channel = "toTcp", poller = @Poller(fixedDelay = "25000"))公共消息心跳(){返回 this.heartbeatMessage;}@ServiceActivator(inputChannel = "heartbeatReplies")公共无效回复(字节 [] 回复){System.out.println(new String(reply));}}

当使用 CachingClientConnectionFactory 时,不清楚为什么要保持空闲连接池打开.但是,池的工作方式是将空闲连接保留在队列中,因此每个请求都将转到最旧的连接,并将连接返回到队列的末尾.

添加 maxMessagesPerPoll 将在每次轮询时发出该数量的消息,并且...

@InboundChannelAdapter(channel = "toTcp",poller = @Poller(fixedDelay = "25000", maxMessagesPerPoll = "5"))

最多可保持 5 个连接打开.它不会打开新连接(如果至少有一个),但如果池包含 5 个或更多连接,则至少有 5 个将保持打开状态.如果没有打开的连接,它只会打开一个.

I've a TCP client which was built using spring integration TCP and the server supports a keep alive message (ping/pong style). The connections were configured using a CachingClientConnectionFactory and I'd like to take advantage on this server feature. Here's my bean configuration:

private static final int SERIALIZER_HEADER_SIZE = 2;

/**
 * Serializer used by connection factory to send and receive messages
 */
@Bean
public ByteArrayLengthHeaderSerializer byteArrayLengthHeaderSerializer() {
    return new ByteArrayLengthHeaderSerializer(SERIALIZER_HEADER_SIZE);
}

@Bean
public AbstractClientConnectionFactory tcpClientConnectionFactory() {
    TcpNetClientConnectionFactory connFactory =
        new TcpNetClientConnectionFactory(props.getUrl(), props.getPort());
    connFactory.setSerializer(byteArrayLengthHeaderSerializer());
    connFactory.setDeserializer(byteArrayLengthHeaderSerializer());
    connFactory.setSoTimeout(props.getSoTimeout());
    if (props.isUseSSL()) {
        connFactory.setTcpSocketFactorySupport(new DefaultTcpNetSSLSocketFactorySupport(() -> {
            return SSLContext.getDefault();
        }));
    }

    return connFactory;
}

/**
 * Connection factory used to create TCP client socket connections
 */
@Bean
public AbstractClientConnectionFactory tcpCachedClientConnectionFactory() {
    CachingClientConnectionFactory cachingConnFactory =
        new CachingClientConnectionFactory(tcpClientConnectionFactory(), props.getMaxPoolSize());
    cachingConnFactory.setConnectionWaitTimeout(props.getMaxPoolWait());
    return cachingConnFactory;
}

Using the solution posted here Configure keep alive to keep connection alive all the time I can keep the connection opened but I also wanted to take leverage on those server keep alive messages and send those messages from time to time to check if the connection is still alive. This can improve the performance on the client side since it won't need to re-connect/create a new connection if the socket was closed.

Based on that, does anyone has a suggestion on how to implement this using spring integration?

解决方案

When using a simple client connection factory, it's easy enough to set up application-level heartbeat messages with an @InboundChannelAdapter.

Simple example:

@SpringBootApplication
public class So46918267Application {

    public static void main(String[] args) throws IOException {
        // Simulated Server
        final ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(1234);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(() -> {
            try {
                Socket socket = server.accept();
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String line;
                while ((line = reader.readLine()) != null) {
                    System.out.println(line);
                    if (line.equals("keep_alive")) {
                        socket.getOutputStream().write("OK\r\n".getBytes());
                    }
                }
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        });
        ConfigurableApplicationContext context = SpringApplication.run(So46918267Application.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        executor.shutdownNow();
        context.close();
        server.close();
    }

    @Bean
    public TcpNetClientConnectionFactory client() {
        return new TcpNetClientConnectionFactory("localhost", 1234);
    }

    @ServiceActivator(inputChannel = "toTcp")
    @Bean
    public TcpOutboundGateway gateway() {
        TcpOutboundGateway gateway = new TcpOutboundGateway();
        gateway.setConnectionFactory(client());
        return gateway;
    }

    // HEARTBEATS

    private final Message<?> heartbeatMessage = MessageBuilder.withPayload("keep_alive")
            .setReplyChannelName("heartbeatReplies")
            .build();

    @InboundChannelAdapter(channel = "toTcp", poller = @Poller(fixedDelay = "25000"))
    public Message<?> heartbeat() {
        return this.heartbeatMessage;
    }

    @ServiceActivator(inputChannel = "heartbeatReplies")
    public void reply(byte[] reply) {
        System.out.println(new String(reply));
    }

}

When using the CachingClientConnectionFactory, though, it's not clear why you would want to keep a pool of idle connections open. However, the way the pool works is the idle connections are kept in a queue so each request would go to the oldest connection and the connection is returned to the end of the queue.

Adding maxMessagesPerPoll would emit that number of messages on each poll and...

@InboundChannelAdapter(channel = "toTcp", 
    poller = @Poller(fixedDelay = "25000", maxMessagesPerPoll = "5"))

would keep up to 5 connections open. It won't open new connections (if there's at least one) but if the pool contains 5 or more connections, at least 5 will be kept open. If there are no open connections, it will open just one.

这篇关于如何使用 spring 集成在 TCP 连接上实现保持活动连接?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆