使用netty高速发送消息时获得OOM异常 [英] Get OOM exception when sending message with a high speed with netty

查看:538
本文介绍了使用netty高速发送消息时获得OOM异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我用netty编写了一个客户端,以便以高速率发送消息. 通过jConsole,我看到老一代"正在增加,最后它抛出了 java.lang.OutOfMemoryError:超出了GC开销限制. 是否有一些方法或配置可以避免这种异常 以下是我的测试代码:

I write a client with netty in order to send message at a high rate. By jConsole I see "old gen" is increasing, and finally it throws java.lang.OutOfMemoryError: GC overhead limit exceeded. Are there some ways or configuration to avoid this exception The following is my test code:

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;

    import java.io.IOException;
    import java.net.UnknownHostException;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeoutException;

    public class TestNettyTcp {
        private EventLoopGroup group;

        private Channel ch;

        /**
         * @param args
         * @throws SyslogSenderException
         * @throws TimeoutException
         * @throws ExecutionException
         * @throws IOException
         * @throws InterruptedException
         * @throws UnknownHostException
         */
        public static void main( String[] args )
            throws UnknownHostException, InterruptedException, IOException,  ExecutionException,  TimeoutException {
            new TestNettyTcp().testSendMessage();
        }

        public TestNettyTcp()
            throws InterruptedException {
            group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group( group ).channel( NioSocketChannel.class )
                // .option( ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024 )
                .option( ChannelOption.SO_RCVBUF, 1048576 ).option( ChannelOption.SO_SNDBUF, 1048576 )
                .option( ChannelOption.TCP_NODELAY, true ).handler( new NettyTcpSyslogSenderInitializer() );
            // Connect to a server.
            ChannelFuture future = b.connect( "192.168.22.70", 514 );
            future.awaitUninterruptibly();
            // Now we are sure the future is completed.
            assert future.isDone();

            if ( !future.isSuccess() ) {
                future.cause().printStackTrace();
            }
            else {
                ch = future.sync().channel();
            }
        }

        public void testSendMessage()
            throws InterruptedException, UnknownHostException, IOException, ExecutionException, TimeoutException {

            ThreadGroup threadGroup = new ThreadGroup( "SendMessage" );
            for ( int k = 0; k < 10; k++ ) {
                Thread thread = new Thread( threadGroup, new Runnable() {

                    @Override
                    public void run() {

                        String payLoad = "key=\"value\" key2=\"value2\" key3=\"value3\" Count:";

                        try {
                            for ( int j = 0; j < 100; j++ ) {
                                long a = System.currentTimeMillis();
                                for ( int i = 0; i < 20000; i++ ) {

                                    ch.writeAndFlush( payLoad + j + "_" + i + "\n" );
                                }
                                System.out.println( "\r<br>Excuted time : " + ( System.currentTimeMillis() - a ) / 1000f
                                    + "seconde" );
                            }

                        }
                        catch ( InterruptedException e ) {
                            e.printStackTrace();
                        }
                        finally {
                            if ( ch != null ) {
                                ch.close();
                            }
                        }
                    }

                } );
                thread.start();
            }

            while ( threadGroup.activeCount() > 0 ) {
                try {
                    Thread.sleep( 1000 );
                }
                catch ( InterruptedException e ) {
                    e.printStackTrace();
                }
            }
        }
    }

    class NettyTcpSyslogSenderInitializer
        extends ChannelInitializer<SocketChannel> {

        public NettyTcpSyslogSenderInitializer() {
            super();
        }

        @Override
        public void initChannel( SocketChannel ch )
            throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast( new StringEncoder() );
        }
    }

代码可以快速重现问题

推荐答案

您编写的速度快于网络堆栈可以处理的速度.请注意,这全都是异步的...您想在Channel.isWritable()返回false时停止写入,并在再次返回true时恢复.您可以通过重写ChannelInboundHandler中的channelWritabilityChanged(...)方法来通知此更改.

You are writing faster then the network stack can handle. Be aware it's all asynchronous... You want to stop writing once Channel.isWritable() returns false and resume once it returns true again. You can notified for this changes by override the channelWritabilityChanged(...) method in ChannelInboundHandler.

这篇关于使用netty高速发送消息时获得OOM异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆