套接字上的多线程客户端服务器聊天.负载测试.接收失败 [英] Multithread client server chat on sockets. Load test. recv failed

查看:42
本文介绍了套接字上的多线程客户端服务器聊天.负载测试.接收失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

大家好,有客户端服务器聊天,我尝试在其上编写负载测试.我使用我的协议,它看起来像 XMPP.我发送 XML 并解析它.如果我启动服务器,对于某些用户它可以正常工作.但是我正在进行负载测试,并且正在启动很多用户并从每个用户发送消息.在测试中,我没有创建新客户端,我只是使用输出流实例化一个输出线程并使用它发送消息.服务器向所有用户发送它收到的消息,因此我创建了一个用户来侦听其他用户.有时我会收到异常:软件导致连接中止:接收失败这是我的控制台:

Hello guys have client server chat and i try to write load test on it. i use my protocol it looks like XMPP. I send XML and parse it. If I start the server, for some users it works properly. But I have am load-testing and am starting a lot of users and sending messages from each one. In the test I do not create new client, i'm only instantiating an output thread with an output stream and sending messages using it. The server sends a message that it recieved to all users, so I create one user that listens to the other users. And sometimes I recieve the exception: Software caused connection abort: recv failed This is my console:

06:55:49 Guest 9 (online)  says : Hello Server. Message number^4
06:55:49 Guest 9 (online)  says : Hello Server. Message number^5
06:55:49 Guest 11 (online)  says : Hello Server. Message number^0
06:55:49 Guest 4 (online)  says : Hello Server. Message number^6
ERROR ServerThread - Error in reading from stream: java.net.SocketException: Software caused connection abort: recv failed
ERROR ServerThread - Error in reading from stream: java.net.SocketException: Software caused connection abort: recv failed
06:55:49 Guest 9 (online)  says : Hello Server. Message number^6

这是我的服务器线程.我跳过了等待用户的部分.

This is my serverThread. I skip the part where it is waiting for users.

public class ServerThread implements Runnable {
    private static final Logger LOG = Logger.getLogger(ServerThread.class);
    private XMLProtocol protocol;
    private Socket socket;
    private BufferedReader input;
    private PrintWriter out;
    private static Date date;
    private String username;
    private String status = "online";
    private SimpleDateFormat dateFormat;
    private String buffer = "";
    private JAXBContext jaxbContext;
    private Unmarshaller jaxbUnmarshaller;

    public ServerThread(Socket s) throws SAXException, IOException, JAXBException {
        input = new BufferedReader(new InputStreamReader(s.getInputStream()));
        jaxbContext = JAXBContext.newInstance(XMLProtocol.class);
        out = new PrintWriter(s.getOutputStream());
        username = "Guest " + Server.getUserCounter();
        dateFormat = new SimpleDateFormat("hh:mm:ss");
        Server.addUser(username, out);
        date = new Date();
        socket = s;
        new Thread(this);
    }

    public void run() {

        try {
            while (true) {
                if (input.ready()) {
                    if (buffer.length() <= 256) {
                        if ((buffer = input.readLine()).toString().endsWith("</XMLProtocol>")) {

                            protocol = new XMLProtocol();
                            jaxbUnmarshaller = jaxbContext.createUnmarshaller();
                            protocol = (XMLProtocol) jaxbUnmarshaller.unmarshal(new StreamSource(new StringReader(buffer)));

                            switch (ChatCommands.valueOf(protocol.getStatus())) {
                            case LOGIN: {
                                Server.sendToAll(Server.buildResponce("User: " + this.username + " Has been changed nickname on "
                                        + protocol.getContent()));
                                this.username = protocol.getContent();
                                break;
                            }
                            case STATUS: {
                                Server.sendToAll(Server.buildResponce("The user: " + this.username + " Is now:" + protocol.getContent()));
                                this.status = protocol.getContent();
                                break;
                            }
                            case LOGOUT: {
                                Server.sendResponce(Server.buildResponce(ResponseCommands.DISCONNECT), out);
                                quit();
                                break;
                            }
                            default: {
                                LOG.trace("Getting message from user: " + username + " recived message: " + protocol.getContent());
                                date = Calendar.getInstance().getTime();
                                Server.sendToAll(Server.buildResponce(dateFormat.format(date.getTime()) + " " + username + " ("
                                        + this.status + ") " + " says : " + protocol.getContent()));
                                break;
                            }
                            }
                        }
                    } else {
                        Server.sendResponce(Server.buildResponce(ResponseCommands.SENDING_FAILED), out);
                    }
                }
            }

        } catch (IOException e) {
            LOG.error("Error in reading from stream: " + e);
        } catch (JAXBException e) {
            LOG.error("Error in Marshalling: " + e);
        } finally {
            try {
                Server.sendResponce(Server.buildResponce(ResponseCommands.UNEXPECTED), out);
                quit();
                LOG.trace("Socket closed");
            } catch (IOException | JAXBException e) {
                LOG.error("Socket no closed" + e);
            }
        }
    }

    public void quit() throws IOException, JAXBException {
        Server.sendToAll(Server.buildResponce("User: " + this.username + " quited"));
        Server.removeUser(out);
        socket.shutdownInput();
        socket.shutdownOutput();
        socket.close();
    }
}

这是我的测试

public class ServerLoadTest {

    private static ExecutorService exec = Executors.newFixedThreadPool(1000);
    private static Socket s[] = new Socket[50];// = new Socket();

    public static void main(String[] args) throws JAXBException, UnknownHostException, IOException, InterruptedException,
            XMLStreamException, ParserConfigurationException, SAXException {
        exec.execute(new TestServerThread()); // start Server thread
        Thread.sleep(500); // wait till Server starts.

        s[0] = new Socket("localhost", 4444);

        exec.execute(new InputThread(s[0], new BufferedReader(new InputStreamReader(s[0].getInputStream())))); // Start
                                                                                                                // one
        for (int i = 0; i < 20; i++) {
            exec.execute(new TestClientThread());           
        }
    }

}

class TestClientThread implements Runnable {
    private static final Logger LOG = Logger.getLogger(TestClientThread.class);
    private XMLProtocol protocol;
    private JAXBContext jaxbContext;
    private Marshaller jaxbMarshaller;
    private Socket socket;
    private OutputStream outputStream;

    public TestClientThread() throws JAXBException, UnknownHostException, IOException, InterruptedException, XMLStreamException,
            ParserConfigurationException, SAXException {
        jaxbContext = JAXBContext.newInstance(XMLProtocol.class);
        jaxbMarshaller = jaxbContext.createMarshaller();
        socket = new Socket("localhost", 4444);
        protocol = new XMLProtocol();
        outputStream = socket.getOutputStream();

        new Thread(this);

    }

    @Override
    public void run() {
        try {

            for (int i = 0; i < 10; i++) {
                protocol.setContent("Hello Server. Message number^" + i);
                protocol.setStatus(ChatCommands.MSG.getCommandCode());
                jaxbMarshaller.marshal(protocol, outputStream);
            }
            protocol.setContent("Hello Server. Message number^");
            protocol.setStatus(ChatCommands.LOGOUT.getCommandCode());
            jaxbMarshaller.marshal(protocol, outputStream);


/*          socket.shutdownInput();
            socket.shutdownOutput();
            socket.close();
            Thread.currentThread().interrupt();*/

        } catch (JAXBException  e) {
            LOG.trace("Error in marshaling ");

        }
    }
}

class TestServerThread implements Runnable {
    private Server server;

    public TestServerThread() {
        new Thread(this);
    }

    @SuppressWarnings("static-access")
    @Override
    public void run() {
        try {
            server.main(null);
        } catch (IOException | JAXBException | ParserConfigurationException | SAXException e) {
            Assert.assertFalse(false);
        }
    }
}

推荐答案

我过去也遇到过这些错误.我认为,这些错误是由非同步资源引起的.但是我在 javadoc 中找不到指出我错误的内容.

I got these errors in the past too. I think, that these errors caused by a non synchronized resource. But I couldnt find something in the javadoc that points me to an mistake.

所以我决定使用 jersey/grizzly 进行连接处理、数据编码等.如果您对此感兴趣,请阅读我的评论:多客户端/服务器.处理沟通

So I decided to use jersey/grizzly for connection handling, data encoding etc. If you're interessted in, read my comment there: MultiClient / Server. Handle communications

但是,如果有人能告诉我们如何在繁重的并发环境中使用普通的旧套接字,我将不胜感激.

But, I would appreciated, if someone can tell us how to use plain old sockets in a heavy concurrent environment.

这篇关于套接字上的多线程客户端服务器聊天.负载测试.接收失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆