如何配置 tcp 服务器以使用 Spring Boot 从多个客户端接收数据? [英] How to config tcp server to receive data from multiple client using spring boot?

查看:31
本文介绍了如何配置 tcp 服务器以使用 Spring Boot 从多个客户端接收数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想配置 TCP 服务器以接收和回复来自多个客户端的数据.我搜索了许多其他线程,但找不到确切的方法.我是第一次使用 spring 集成,没有经验.

I would like to configure TCP server to receive and reply data from multiple clients. I searched many other thread but could not found exact way to do. I'm using spring integration first time and have no experience.

服务器要求

  1. 应该能够接收和回复特定客户端的数据(可以有多个客户端,每个客户端应该单独处理)
  2. 应该能够向客户端发送数据并等待特定超时的响应.
  3. 应该能够检测客户端是否断开连接.如果客户端断开连接,则应关闭连接以节省内存.(在没有 spring 集成的早期方法中,我可以通过 ping 客户端来完成,看到发送是否失败,但不知道如何使用 spring 集成)

我尝试了下面的代码,其中我能够向客户端发送数据但可以达到我的上述要求

I tried below code, In which I'm able to send data to client but could achieve my above requirements

TCP 服务器配置:

@Configuration
public class TcpServerConfig {

    private List<TcpConnectionOpenEvent> clientList = new ArrayList<>();

    public List<TcpConnectionOpenEvent> getClientList() {
        return clientList;
    }

    @Bean
    public TcpReceivingChannelAdapter server(TcpNetServerConnectionFactory cf) {
        TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
        adapter.setConnectionFactory(cf);
        adapter.setOutputChannel(inputChannel());
        return adapter;
    }

    @Bean
    public MessageChannel inputChannel() {
        return new QueueChannel();
    }

    @Bean
    public MessageChannel outputChannel() {
        return new DirectChannel();
    }

    @Bean
    public TcpNetServerConnectionFactory cf() {
        return new TcpNetServerConnectionFactory(1001);
    }

    @Bean
    public IntegrationFlow outbound() {
        return IntegrationFlows.from(outputChannel())
                .handle(sender())
                .get();
    }

    @Bean
    public MessageHandler sender() {
        TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
        tcpSendingMessageHandler.setConnectionFactory(cf());
        return tcpSendingMessageHandler;
    }

    @Bean
    public ApplicationListener<TcpConnectionOpenEvent> listener() {
        return new ApplicationListener<TcpConnectionOpenEvent>() {

            @Override
            public void onApplicationEvent(TcpConnectionOpenEvent event) {
                outputChannel().send(MessageBuilder.withPayload("foo")
                        .setHeader(IpHeaders.CONNECTION_ID, event.getConnectionId())
                        .build());

                clientList.add(event);
            }
        };
    }
}

测试代码:

@Service
public class Test {

    private static final Logger LOGGER = LoggerFactory.getLogger(MessageServiceImpl.class);

    @Autowired
    TcpServerConfig tcpServerConfig;

    @Autowired
    private MessageChannel outputChannel;

    @Autowired
    private MessageChannel inputChannel;

    @Scheduled(fixedRate = 1000)
    void task() {
            LOGGER.info("Client count: " + tcpServerConfig.getClientList().size());

            for (TcpConnectionOpenEvent client : tcpServerConfig.getClientList()) {
                outputChannel.send(MessageBuilder.withPayload("foo")
                        .setHeader(IpHeaders.CONNECTION_ID, client.getConnectionId())
                        .build());
            }
        }
}

任何帮助将不胜感激.

推荐答案

这是一个解决方案:

@SpringBootApplication
@EnableScheduling
public class So62877512ServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(So62877512ServerApplication.class, args);
    }

    @Bean
    public IntegrationFlow serverIn(Handler handler) {
        return IntegrationFlows.from(Tcp.inboundAdapter(server()))
                .transform(Transformers.objectToString())
                .filter(handler, "existingConnection", spec -> spec
                        .discardFlow(f -> f
                                .handle(handler, "sendInitialReply")))
                .handle(handler, "reply")
                .get();
    }

    @Bean
    public IntegrationFlow serverOut() {
        return f -> f.handle(Tcp.outboundAdapter(server()));
    }

    @Bean
    public TcpServerConnectionFactorySpec server() {
        return Tcp.netServer(1234)
                .serializer(TcpCodecs.lf())
                .deserializer(TcpCodecs.lf()); // compatible with netcat
    }

}

@Component
@DependsOn("serverOut")
class Handler {

    private static final Logger LOG = LoggerFactory.getLogger(Handler.class);

    private final ConcurrentMap<String, BlockingQueue<Message<?>>> clients = new ConcurrentHashMap<>();

    private final MessageChannel out;

    private final TcpNetServerConnectionFactory server;

    public Handler(@Qualifier("serverOut.input") MessageChannel out, TcpNetServerConnectionFactory server) {
        this.out = out;
        this.server = server;
    }

    public boolean existingConnection(Message<?> message) {
        String connectionId = message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class);
        boolean containsKey = this.clients.containsKey(connectionId);
        if (!containsKey) {
            this.clients.put(connectionId, new LinkedBlockingQueue<Message<?>>());
        }
        return containsKey;
    }

    public void sendInitialReply(Message<String> message) {
        LOG.info("Replying to " + message.getPayload());
        this.out.send(MessageBuilder.withPayload(message.getPayload().toUpperCase())
                .copyHeaders(message.getHeaders()).build());
    }

    @Scheduled(fixedDelay = 5000)
    public void sender() {
        this.clients.forEach((key, queue) -> {
            try {
                this.out.send(MessageBuilder.withPayload("foo")
                        .setHeader(IpHeaders.CONNECTION_ID, key).build());
                Message<?> reply = queue.poll(10, TimeUnit.SECONDS);
                if (reply == null) {
                    LOG.error("Timeout waiting for " + key);
                    this.server.closeConnection(key);
                }
                else {
                    LOG.info("Reply " + reply.getPayload() + " from " + key);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.error("Interrupted");
            }
            catch (Exception e) {
                LOG.error("Failed to send to " + key, e);
            }
        });
    }

    public void reply(Message<String> in) {
        BlockingQueue<Message<?>> queue = this.clients.get(in.getHeaders().get(IpHeaders.CONNECTION_ID, String.class));
        if (queue != null) {
            queue.add(in);
        }
    }

    @EventListener
    public void closed(TcpConnectionCloseEvent event) {
        this.clients.remove(event.getConnectionId());
        LOG.info(event.getConnectionId() + " closed");
    }

}

$ nc localhost 1234
foo <- typed
FOO
foo
bar <- typed
foo
bar <- typed
foo

$ <- closed by server - timeout

2020-07-14 14:41:04.906  INFO 64763 --- [pool-1-thread-2] com.example.demo.Handler                 : Replying to foo
2020-07-14 14:41:13.841  INFO 64763 --- [   scheduling-1] com.example.demo.Handler                 : Reply bar from localhost:65115:1234:a9fc7e3d-4dda-4627-b765-4f0bb0835153
2020-07-14 14:41:21.465  INFO 64763 --- [   scheduling-1] com.example.demo.Handler                 : Reply bar from localhost:65115:1234:a9fc7e3d-4dda-4627-b765-4f0bb0835153
2020-07-14 14:41:36.473 ERROR 64763 --- [   scheduling-1] com.example.demo.Handler                 : Timeout waiting for localhost:65115:1234:a9fc7e3d-4dda-4627-b765-4f0bb0835153
2020-07-14 14:41:36.474  INFO 64763 --- [   scheduling-1] com.example.demo.Handler                 : localhost:65115:1234:a9fc7e3d-4dda-4627-b765-4f0bb0835153 closed

这篇关于如何配置 tcp 服务器以使用 Spring Boot 从多个客户端接收数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆