Spring集成-适用于大批量应用的可靠TCP [英] Spring Integration - Reliable TCP for high volume application

查看:257
本文介绍了Spring集成-适用于大批量应用的可靠TCP的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spring Integration for TCP服务器,该服务器保持与数千个客户端的连接.我需要服务器在负载过大的情况下限制客户端,并且不要丢失消息.

I'm using Spring Integration for TCP server which keeps connections to a few thousand clients. I need the server to throttle clients in case of excessive load and not to lose messages.

我的服务器配置:

<task:executor id="myTaskExecutor"
    pool-size="4-8"
    queue-capacity="0"
    rejection-policy="CALLER_RUNS" />

<int-ip:tcp-connection-factory id="serverTcpConFact"
    type="server"
    port="60000"
    using-nio="true"
    single-use="false"
    so-timeout="300000"
    task-executor="myTaskExecutor" />

<int-ip:tcp-inbound-channel-adapter id="tcpInboundAdapter"
    channel="tcpInbound"
    connection-factory="serverTcpConFact" />

<channel id="tcpInbound" />

<service-activator input-channel="tcpInbound"
    ref="myService"
    method="test" />

<beans:bean id="myService" class="org.test.tcpserver.MyService" />

由于连接工厂的默认任务执行器是无界的,因此我使用池化任务执行器来防止内存不足错误.

Since the default task executor for the connection factory is unbounded, I use a pooled task executor to prevent out of memory errors.

一个用于负载测试的简单客户端:

A simple client for load testing:

public class TCPClientTest {
    static Socket socket;
    static List<Socket> sl = new ArrayList<>();
    static DataOutputStream out;

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 10000; i++) {
            socket = new Socket("localhost", 60000);
            sl.add(socket);
            out = new DataOutputStream(socket.getOutputStream());
            out.writeBytes("connection " + i + "\r\n");
            System.out.println("Using connection #" + i);
        }
        System.in.read();
    }
}

当我运行它时,服务器只收到大约10-20条消息,然后客户端收到拒绝连接:连接"异常.之后,即使连接超时后,服务器也无法再接受任何新连接.增大池的大小只会有助于获得更多消息.

When I run it, the server only receives about 10-20 messages and then the client gets the "Connection refused: connect" exception. After that the server can't accept any new connections anymore, even after the connection timeout. Increasing the pool size only helps to get a little bit more messages.

编辑

我正在使用Spring Integration 3.0.2.RELEASE.对于生产环境,我使用的是8-40个线程,但是只有经过数百次连接后,它才会使该测试稍后失败.

I'm using Spring Integration 3.0.2.RELEASE. For production I'm using 8-40 threads, but it only makes this test to fail later, after several hundred connections.

MyService.test()没什么作用...

MyService.test() doesn't do much...

public class MyService {
    public void test(byte[] input) {
        System.out.println("Received: " + new String(input));
    }
}

此处是具有跟踪级别日志记录的日志.

来源

推荐答案

我看到了问题所在,请打开

I see what the problem is, please open a JIRA issue.

问题是执行器中的CALLER_RUNS拒绝策略和长度为0的队列.

The issue is the CALLER_RUNS rejection policy with a 0 length queue in the executor.

有一个线程可以处理所有IO事件(通常是myTaskExecutor-1);当读取事件触发时,他将执行排队以读取数据;阅读器线程将执行的执行排队以组装数据(数据将阻塞直到出现完整的消息(在您的情况下为CRLF终止)为止.

There is one thread that handles all IO events (usually myTaskExecutor-1); when a read event fires he queues an execution to read the data; the reader thread queues an execution to assemble the data (which will block until a complete message - in your case terminated by the CRLF - arrives).

在这种情况下,当没有可用线程时,CALLER_RUNS策略意味着IO选择器线程执行读取操作,并成为汇编程序线程,该线程将阻止等待数据的到来,因为该数据将被阻塞并且会阻塞稍后在安排了另一个线程进行阻塞之后读取了数据.因为他被阻止,所以他无法处理新的接受事件.

In this case, when there are no threads available, the CALLER_RUNS policy means the IO selector thread does the read, and becomes the assembler thread, which blocks waiting for data that won't arrive because he is blocked and would later have read the data after scheduling a different thread to block. Because he is blocked, he can't handle new accept events.

这是我的测试记录,显示了该问题...

Here is a log from my test showing the issue...

TRACE: [May-18 10:43:38,923][myTaskExecutor-1] tcp.connection.TcpNioServerConnectionFactory - Port 60000 SelectionCount: 2
DEBUG: [May-18 10:43:38,923][myTaskExecutor-1] tcp.connection.TcpNioConnection - localhost:58509:60000:bdc36c59-c31b-470e-96c3-6270e7c46a2f Reading...
DEBUG: [May-18 10:43:38,924][myTaskExecutor-1] tcp.connection.TcpNioConnection - localhost:58509:60000:bdc36c59-c31b-470e-96c3-6270e7c46a2f Running an assembler
TRACE: [May-18 10:43:38,924][myTaskExecutor-1] tcp.connection.TcpNioConnection - localhost:58509:60000:bdc36c59-c31b-470e-96c3-6270e7c46a2f Nio message assembler running...
DEBUG: [May-18 10:43:38,926][myTaskExecutor-1] tcp.serializer.ByteArrayCrLfSerializer - Available to read:0

第二行显示了用于执行读取操作的选择器线程;他检测到该套接字需要一个汇编器,并成为汇编器,阻止并等待数据.

The second line shows the selector thread being used to do the read; he detects that an assembler is needed for this socket, and becomes the assembler, blocking, waiting for data.

您真的相信使用无限制的任务执行器会出现问题吗?这些事件通常持续时间很短,因此线程将很快被回收.

Do you really believe there will be an issue using an unbounded task executor? These events are generally pretty short lived so threads will be recycled pretty quickly.

将执行者的队列容量增加到0以上也有帮助,但不能完全确保问题不会发生(尽管不太可能遇到大队列).

Increasing the executor's queue capacity above 0 should help too, but it won't completely assure the problem won't happen (although a large queue size is unlikely to be hit).

除了对IO选择器和读取器线程使用专用的任务执行器之外,我不确定如何解决此问题,因此它们永远不会用作汇编器.

I am yet not sure how to fix this, aside from using a dedicated task executor for the IO selector and reader threads so they will never be used as an assembler.

这篇关于Spring集成-适用于大批量应用的可靠TCP的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆