Java,如何管理线程读取套接字(websocket)? [英] Java, how manage threads to read socket (websocket)?

查看:37
本文介绍了Java,如何管理线程读取套接字(websocket)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个WebSocket服务器.

I have a WebSocket server.

我的服务器创建了一个新线程来处理新连接.该线程一直存在,直到websocket断开为止.

my server create a new thread for handle a new connection.the thread is live until websocket break.

我的问题:对于1_000_000个连接,我需要1_000_000个线程.我如何通过线程处理许多websocket?没有等待?

my problem: for 1_000_000 connections i need 1_000_000 threads. how i can handle many websockets by a thread? without wait?

ServerSocket server;
private ExecutorService executor = new ThreadPoolExecutor(1_000_000 , 1_000_000 , 7, TimeUnit.SECONDS, queue, threadFactory);

try
{
    server = new ServerSocket(port); 
}
catch (IOException e) {}

while (true)
 {
    Socket client = null;

   try
   {
        client = server.accept();

        Runnable r = new Runnable()
        {
           run()
           {
              // this is simple, original is complete WebSocket imp
               client.getInputStream().read();
           }
        };

        executor.execute(r);
    }

 catch (IOException e) {}
}

推荐答案

考虑一下,您具有套接字映射,每当收到一条消息到服务器时,您都将获得消息和相关的套接字!

Think about this you have a map of sockets and every time a message received to server you will get message and related socket !

此操作是使用OS(linux,windows,unix,mac-OS ...)内核完成的!

this operation done with OS(linux , windows , unix , mac-OS , ...) kernel !

因此您可以在一个线程中处理一百万个连接!

so you can handle a million connection just in one thread !

我们将此称为 Non-Blocking套接字,这意味着它们从不阻止您的线程进行读写,也不会阻止任何其他操作,例如accept和...!

we call this None-Blocking sockets which means they never block your thread to read or write or any other operation such as accept and ... !

java有一个软件包可以处理这个问题! java.nio.*

java has a package to handle this ! java.nio.*

它是如何工作的?

  • 用于处理IO操作的线程
  • 一个选择器,用于选择哪个套接字具有操作以及什么类型的操作
  • ByteBuffer来处理读写操作,而不是在阻塞套接字中使用socket.stream

您还可以使用多个线程和选择器(每个选择器都有其自己的线程)

看这个例子:

NoneBlockingServer.java :

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NoneBlockingServer {

public static void main(String[] args) throws Exception
{
    runServer("localhost" , 5050);
}


private final static void runServer(String host , int port)throws Exception {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.bind(new InetSocketAddress(host, port));
    serverSocketChannel.configureBlocking(false); //config to be a none-blocking serve-socket
    SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    //register to selector for operation ACCEPT !
    //also you can use selectionKey for some other stuffs !

    while (true) {

        int numberOfReadSockets = selector.select();
        //it will wait until a socket(s) be ready for some io operation
        //or other threads call selector.wakeup()

        if(numberOfReadSockets==0){
            //maybe selector.wakeup() called
            //do some sync operations here !
            continue; // continue selecting !
        }

        Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

        while (keys.hasNext())
        {
            SelectionKey key = keys.next();
            keys.remove(); //remove selected key from current selection !

            //handle selected key


            if(key.isValid() && key.isReadable())
            {

                //it means this socket is valid and has data to read


                SocketChannel socketChannel =
                        (SocketChannel) key.channel();

                ByteBuffer buffer = ByteBuffer.allocate(100); // allocate 100 bytes for buffer
                //maybe you must use an allocated buffer for each connection
                // instead of allocate for each operation

                int read = socketChannel.read(buffer);

                if(read<0)
                {
                    //need to close channel !
                    socketChannel.close(); // explicitly remove from selector
                    System.out.println("CONNECTION CLOSED");
                    continue; //socket closed and other operations will skip
                }else
                {
                    buffer.flip(); // you need to learn work with ByteBuffers
                    byte[] bytes = new byte[buffer.remaining()];
                    buffer.get(bytes);
                    //maybe convert it to String
                    String msg = new String(bytes);
                    //use msg !
                    System.out.println("MESSAGE : "+msg);

                    key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
                    //set interestOps to WRIT and READ to write hello back message !
                    key.attach(ByteBuffer.wrap("Hello Client !".getBytes("UTF-8")));
                    //wrap a array of bytes using wrap and attach it to selectionKey
                }

            }

            if(key.isValid() && key.isWritable())
            {
                //it means this socket is valid and have space to write data !

                SocketChannel socketChannel =
                        (SocketChannel) key.channel();

                //you must represent data you want to write to this socket
                //maybe attached to selection key !
                ByteBuffer dataToWrite = (ByteBuffer) key.attachment();
                //key.attachment here to help u have some meta data about each socket
                //use it smart !

                int write = socketChannel.write(dataToWrite);

                if(write<0)
                {
                    //so means some error occurs better to close it !
                    socketChannel.close();
                    System.out.println("CONNECTION CLOSED !"); //log
                    continue;//as socket closed we will skip next operations !
                }else if(!dataToWrite.hasRemaining())
                {
                    //so all data putted to buffer !
                    key.interestOps(SelectionKey.OP_READ); // just need to read !
                }
            }

            if(key.isValid() && key.isAcceptable())
            {
                ServerSocketChannel server =
                        (ServerSocketChannel) key.channel();//just server channels has accept operation

                SocketChannel socketChannel = server.accept(); //accept it !

                socketChannel.configureBlocking(false); // config none-blocking mode

                socketChannel.register(selector , SelectionKey.OP_READ);

                //also you can register for multiple operation using | operation
                //for example register for both read and write SelectionKey.READ|SelectionKey.WRITE
                //also you can change is late using key.interestOps(int ops)


                System.out.println("NEW CONNECTION"); //log
            }

            //there is another type of key,  key.isConnectable()
            //google it !




        }
    }
}
}

,这里是 BlockingClient.java :

import java.net.InetSocketAddress;
import java.net.Socket;

public class BlockingClient {

//using blocking sockets !
public static void main(String[] args)throws Exception
{
    Socket socket = new Socket();
    socket.connect(new InetSocketAddress("localhost" , 5050));
    socket.getOutputStream()
            .write("Hello Server".getBytes("UTF-8"));
    byte[] buffer = new byte[100];
    int len  = socket.getInputStream().read(buffer);
    System.out.println(new String(buffer , 0 , len , "UTF-8"));
    socket.close();
}
}

在此示例中,我们将"Hello Server"消息从阻止客户端"发送到无阻止服务器",服务器将响应"Hello Client"消息!

at this example we send Hello Server message from Blocking Client to None-Blocking Server and server will response Hello Client message !

快跑!

祝你好运

这篇关于Java,如何管理线程读取套接字(websocket)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆