Jboss Netty-如何使用3个工作线程处理2个连接 [英] Jboss Netty - How to serve 2 connections using 3 worker threads

查看:60
本文介绍了Jboss Netty-如何使用3个工作线程处理2个连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

仅举一个简单的例子,我想只使用netty中的2个工作线程来处理3个同时的TCP客户端连接,我该怎么做?

Just as a simple example, lets say I want to handle 3 simultaneous TCP client connections using only 2 worker threads in netty, how would I do it?

问题 一种) 使用下面的代码,我的第三个连接没有从服务器获取任何数据-该连接只是位于那里.注意-我的工人遗嘱执行人和工人人数是2. 因此,如果我有2个工作线程和3个连接,那么这2个线程是否应该为这3个连接提供服务?

Questions A) With the code below, my third connection doesn't get any data from the server - the connection just sits there. Notice - how my worker executor and worker count is 2. So if I have 2 worker threads and 3 connections, shouldnt all three connections be served by the 2 threads?

B) 另一个问题是-Netty是否使用java.util.concurrent的CompletionService?它似乎没有使用它.另外,我没有看到执行executor.submit或future.get的任何源代码. 因此,所有这些加重了它如何处理和将数据提供给比其工作线程更多的连接的困惑?

B) Another question is - Does netty use CompletionService of java.util.concurrent? It doesnt seem to use it. Also, I didnt see any source code that does executor.submit or future.get So all this has added to the confusion of how it handles and serves data to connections that are MORE than its worker threads?

C) 我不知道netty如何处理10000个以上的同时TCP连接....它将创建10000个线程吗?每个连接的线程不是可伸缩的解决方案,所以我很困惑,因为我的测试代码无法按预期工作.

C) I'm lost on how netty handles 10000+ simultaneous TCP connections....will it create 10000 threads? Thread per connection is not a scalable solution, so I'm confused, because how my test code doesnt work as expected.

    import java.net.InetSocketAddress;
    import java.nio.channels.ClosedChannelException;
    import java.util.Date;
    import java.util.concurrent.Executors;
    import java.util.logging.Level;
    import java.util.logging.Logger;

    import org.jboss.netty.bootstrap.ServerBootstrap;
    import org.jboss.netty.channel.Channel;
    import org.jboss.netty.channel.ChannelFuture;
    import org.jboss.netty.channel.ChannelFutureListener;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.channel.ChannelPipeline;
    import org.jboss.netty.channel.ChannelPipelineFactory;
    import org.jboss.netty.channel.ChannelStateEvent;
    import org.jboss.netty.channel.Channels;
    import org.jboss.netty.channel.ExceptionEvent;
    import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
    import org.jboss.netty.handler.codec.string.StringEncoder;

    public class SRNGServer {

      public static void main(String[] args) throws Exception {
          // Configure the server.
          ServerBootstrap bootstrap = new ServerBootstrap(
                  new NioServerSocketChannelFactory(
                          Executors.newCachedThreadPool(),
                          //Executors.newCachedThreadPool()
                          Executors.newFixedThreadPool(2),2
                          ));

          // Configure the pipeline factory.
          bootstrap.setPipelineFactory(new SRNGServerPipelineFactoryP());

          // Bind and start to accept incoming connections.
          bootstrap.bind(new InetSocketAddress(8080));
      }



      private static class SRNGServerHandlerP extends SimpleChannelUpstreamHandler {

        private static final Logger logger = Logger.getLogger(SRNGServerHandlerP.class.getName());


        @Override
        public void channelConnected(
                ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {

          // Send greeting for a new connection.
          Channel ch=e.getChannel();

          System.out.printf("channelConnected with channel=[%s]%n", ch);

          ChannelFuture writeFuture=e.getChannel().write("It is " + new Date() + " now.\r\n");

          SRNGChannelFutureListener srngcfl=new SRNGChannelFutureListener();

          System.out.printf("Registered listener=[%s] for future=[%s]%n", srngcfl, writeFuture);

          writeFuture.addListener(srngcfl);      

        }

        @Override
        public void exceptionCaught(
                ChannelHandlerContext ctx, ExceptionEvent e) {

            logger.log(
                    Level.WARNING,
                    "Unexpected exception from downstream.",
                    e.getCause());
            if(e.getCause() instanceof ClosedChannelException){
              logger.log(Level.INFO, "****** Connection closed by client - Closing Channel");
            }
            e.getChannel().close();
        }
      }



      private static class SRNGServerPipelineFactoryP implements ChannelPipelineFactory {

        public ChannelPipeline getPipeline() throws Exception {

            // Create a default pipeline implementation.
            ChannelPipeline pipeline = Channels.pipeline();

            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast("handler", new SRNGServerHandlerP());

            return pipeline;
        }
      }


      private static class SRNGChannelFutureListener implements ChannelFutureListener{

        public void operationComplete(ChannelFuture future) throws InterruptedException{
          Thread.sleep(1000*5);
          Channel ch=future.getChannel();
          if(ch!=null && ch.isConnected()){
              ChannelFuture writeFuture=ch.write("It is " + new Date() + " now.\r\n");
              //-- Add this instance as listener itself.
              writeFuture.addListener(this);
          }

        }

      }
    }

推荐答案

我没有详细分析您的源代码,所以我不知道为什么它不能正常工作.但是SRNGChannelFutureListener中的这一行看起来可疑:

I haven't analyzed your source code in detail, so I don't know exactly why it doesn't work properly. But this line in SRNGChannelFutureListener looks suspicious:

Thread.sleep(1000*5);

这将使执行它的线程锁定5秒钟;在这段时间内,该线程将无法进行任何其他处理.

This will make the thread that executes it be locked for 5 seconds; the thread will not be available to do any other processing during that time.

问题C:不,它不会创建10,000个线程; Netty的全部要点是它不会这样做,因为那的确无法很好地扩展.相反,它使用线程池中有限数量的线程,在发生任何情况时生成事件,并在池中的线​​程上运行事件处理程序.因此,线程和连接彼此分离(每个连接没有线程).

Question C: No, it will not create 10,000 threads; the whole point of Netty is that it doesn't do that, because that would indeed not scale very well. Instead, it uses a limited number of threads from a thread pool, generates events whenever something happens, and runs event handlers on the threads in the pool. So, threads and connections are decoupled from each other (there is not a thread for each connection).

要使此机制正常工作,事件处理程序应尽快返回,以使它们运行的​​线程可用于尽快运行下一个事件处理程序.如果您让一个线程休眠5秒钟,那么您将保持该线程的分配状态,因此它将无法用于处理其他事件.

To make this mechanism work properly, your event handlers should return as quickly as possible, to make the threads that they run on available for running the next event handler as quickly as possible. If you make a thread sleep for 5 seconds, then you're keeping the thread allocated, so it won't be available for handling other events.

问题B:如果您真的想知道,可以将源代码提供给Netty并进行查找.它使用选择器和其他java.nio类进行异步I/O .

Question B: If you really want to know you could get the source code to Netty and find out. It uses selectors and other java.nio classes for doing asynchronous I/O.

这篇关于Jboss Netty-如何使用3个工作线程处理2个连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆