使用线程处理套接字 [英] Using Threads to Handle Sockets

查看:108
本文介绍了使用线程处理套接字的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个基本上是聊天室的java程序。这是一个课程的分配,所以没有代码请,我只是有一些问题确定最可行的方式来处理我需要做的事情。我已经为单个客户端设置了一个服务器程序,使用线程获取数据输入流和一个线程来处理数据输出流上的发送。我现在需要做的是为每个传入请求创建一个新线程。

I am working on a java program that is essentially a chat room. This is an assignment for class so no code please, I am just having some issues determining the most feasible way to handle what I need to do. I have a server program already setup for a single client using threads to get the data input stream and a thread to handle sending on the data output stream. What I need to do now is create a new thread for each incoming request.

我的想法是创建一个链接列表来包含客户端套接字,或者可能是线程。我磕磕绊绊的地方是弄清楚如何处理将消息发送给所有客户。如果我为每个传入消息都有一个线程,那么我该如何转身并将其发送到每个客户端套接字。

My thought is to create a linked list to contain either the client sockets, or possibly the thread. Where I am stumbling is figuring out how to handle sending the messages out to all the clients. If I have a thread for each incoming message how can I then turn around and send that out to each client socket.

我想如果我有一个客户端套件的链表,那么我可以遍历列表并将其发送给每个,但是我必须创建一个dataoutputstream每次。我可以创建dataoutputstream的链接列表吗?对不起,如果它听起来像我在漫无目的,但我不想只是开始编码,如果没有一个好的计划,它可能会变得混乱。谢谢!

I'm thinking that if I had a linkedlist of the clientsockets I could then traverse the list and send it out to each one, but then I would have to create a dataoutputstream each time. Could I create a linkedlist of dataoutputstreams? Sorry if it sounds like I'm rambling but I don't want to just start coding this, it could get messy without a good plan. Thanks!

编辑
我决定发布到目前为止的代码。我还没有机会测试它,所以任何评论都会很棒。谢谢!

EDIT I decided to post the code I have so far. I haven't had a chance to test it yet so any comments would be great. Thanks!

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.net.ServerSocket;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class prog4_server {

    // A Queue of Strings used to hold out bound Messages
    // It blocks till on is available
    static BlockingQueue<String> outboundMessages = new LinkedBlockingQueue<String>();

    // A linked list of data output streams
    // to all the clients
    static LinkedList<DataOutputStream> outputstreams;

    // public variables to track the number of clients
    // and the state of the server
    static Boolean serverstate = true;
    static int clients = 0;

    public static void main(String[] args) throws IOException{

        //create a server socket and a clientSocket
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(6789);
        } catch (IOException e) {
            System.out.println("Could not listen on port: 6789");
            System.exit(-1);
        }// try{...}catch(IOException e){...}

        Socket clientSocket;

        // start the output thread which waits for elements
        // in the message queue
        OutputThread out = new OutputThread();
        out.start();

        while(serverstate){

            try {

                // wait and accept a new client
                // pass the socket to a new Input Thread
                clientSocket = serverSocket.accept();
                DataOutputStream ServerOut = new DataOutputStream(clientSocket.getOutputStream());
                InputThread in = new InputThread(clientSocket, clients);
                in.start();
                outputstreams.add(ServerOut);

            } catch (IOException e) {

                System.out.println("Accept failed: 6789");
                System.exit(-1);
            }// try{...}catch{..}

            // increment the number of clients and report
            clients = clients++;

            System.out.println("Client #" + clients + "Accepted");

        }//while(serverstate){...

    }//public static void main

    public static class OutputThread extends Thread {

        //OutputThread Class Constructor
        OutputThread() {
        }//OutputThread(...){...

        public void run() {

            //string variable to contain the message
            String msg = null;

            while(!this.interrupted()) {

                try {

                    msg = outboundMessages.take();

                    for(int i=0;i<outputstreams.size();i++){

                        outputstreams.get(i).writeBytes(msg + '\n');

                    }// for(...){...

                 } catch (IOException e) {

                    System.out.println(e);

                 } catch (InterruptedException e){

                     System.out.println(e);

                 }//try{...}catch{...}

            }//while(...){

        }//public void run(){...

    }// public OutputThread(){...

    public static class InputThread extends Thread {

        Boolean threadstate = true;
        BufferedReader ServerIn;
        String user;
        int threadID;
        //SocketThread Class Constructor
        InputThread(Socket clientSocket, int ID) {

            threadID = ID;

            try{
                ServerIn = new BufferedReader(
                    new InputStreamReader(clientSocket.getInputStream()));
                    user = ServerIn.readLine();
            }
            catch(IOException e){
                System.out.println(e);
            }

        }// InputThread(...){...

        public void run() {

            String msg = null;

        while (threadstate) {

                try {

                    msg = ServerIn.readLine();

                    if(msg.equals("EXITEXIT")){

                        // if the client is exiting close the thread
                        // close the output stream with the same ID
                        // and decrement the number of clients
            threadstate = false;
                        outputstreams.get(threadID).close();
                        outputstreams.remove(threadID);
                        clients = clients--;
                        if(clients == 0){
                            // if the number of clients has dropped to zero
                            // close the server
                            serverstate = false;
                            ServerIn.close();
                        }// if(clients == 0){...
                    }else{

                        // add a message to the message queue
                        outboundMessages.add(user + ": " + msg);

                    }//if..else...

                } catch (IOException e) {

                    System.out.println(e);

                }// try { ... } catch { ...}

        }// while

        }// public void run() { ...
    }

    public static class ServerThread extends Thread {

        //public variable declaration
        BufferedReader UserIn =
                new BufferedReader(new InputStreamReader(System.in));

        //OutputThread Class Constructor
        ServerThread() {

        }//OutputThread(...){...

        public void run() {

            //string variable to contain the message
            String msg = null;

            try {

                //while loop will continue until
                //exit command is received
                //then send the exit command to all clients

                msg = UserIn.readLine();

                while (!msg.equals("EXITEXIT")) {

                    System.out.println("Enter Message: ");
                    msg = UserIn.readLine();

                }//while(...){

                outboundMessages.add(msg);
                serverstate = false;
                UserIn.close();

            } catch (IOException e) {
                System.out.println(e);

            }//try{...}catch{...}


        }//public void run(){...
    }// public serverThread(){...

}// public class prog4_server


推荐答案

我过去通过为每个客户端连接定义一个 MessageHandler 类来解决这个问题,负责入站/出站消息流量。在内部,处理程序使用 BlockingQueue 实现,在其上放置出站消息(由内部工作线程)。 I / O发送方线程不断尝试从队列中读取(如果需要则阻塞)并将检索到的每条消息发送给客户端。

I have solved this problem in the past by defining a "MessageHandler" class per client connection, responsible for inbound / outbound message traffic. Internally the handler uses a BlockingQueue implementation onto which outbound messages are placed (by internal worker threads). The I/O sender thread continually attempts to read from the queue (blocking if required) and sends each message retrieved to the client.

这是一些框架示例代码(未经测试) :

Here's some skeleton example code (untested):

/**
 * Our Message definition.  A message is capable of writing itself to
 * a DataOutputStream.
 */
public interface Message {
  void writeTo(DataOutputStream daos) throws IOException;
}

/**
 * Handler definition.  The handler contains two threads: One for sending
 * and one for receiving messages.  It is initialised with an open socket.
 */    
public class MessageHandler {
  private final DataOutputStream daos;
  private final DataInputStream dais;
  private final Thread sender;
  private final Thread receiver;
  private final BlockingQueue<Message> outboundMessages = new LinkedBlockingQueue<Message>();

  public MessageHandler(Socket skt) throws IOException {
    this.daos = new DataOutputStream(skt.getOutputStream());
    this.dais = new DataInputStream(skt.getInputStream());

    // Create sender and receiver threads responsible for performing the I/O.
    this.sender = new Thread(new Runnable() {
      public void run() {
        while (!Thread.interrupted()) {
          Message msg = outboundMessages.take(); // Will block until a message is available.

          try {
            msg.writeTo(daos);
          } catch(IOException ex) {
            // TODO: Handle exception
          }
        }
      }
    }, String.format("SenderThread-%s", skt.getRemoteSocketAddress()));

    this.receiver = new Thread(new Runnable() {
      public void run() {
        // TODO: Read from DataInputStream and create inbound message.
      }
    }, String.format("ReceiverThread-%s", skt.getRemoteSocketAddress()));

    sender.start();
    receiver.start();
  }

  /**
   * Submits a message to the outbound queue, ready for sending.
   */
  public void sendOutboundMessage(Message msg) {
    outboundMessages.add(msg);
  }

  public void destroy() {
    // TODO: Interrupt and join with threads.  Close streams and socket.
  }
}

请注意,Nikolai在阻止I / O方面是正确的每个连接使用1(或2)个线程不是可扩展的解决方案,通常可以使用Java NIO编写应用程序以解决此问题。但是,实际上,除非您正在编写成千上万个客户端同时连接的企业服务器,否则这不是真正的问题。使用Java NIO编写无错误的可伸缩应用程序困难,当然不是我推荐的。

Note that Nikolai is correct in that blocking I/O using 1 (or 2) threads per connection is not a scalable solution and typically applications might be written using Java NIO to get round this. However, in reality unless you're writing an enterprise server which thousands of clients connect to simultaneously then this isn't really an issue. Writing bug-free scalable applications using Java NIO is difficult and certainly not something I'd recommend.

这篇关于使用线程处理套接字的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆