如何在WebSockets中使用FixedChannelPool? [英] How to use FixedChannelPool with WebSockets?

查看:302
本文介绍了如何在WebSockets中使用FixedChannelPool?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

您好,其他开发人员...

Hello fellow developers...

我正在尝试实现WebSocket固定连接池,不幸的是netty为FixedChannelPool提供的指南很差,最大的所能获得的"是他们的

I am trying to implement WebSocket fixed connection pool, unfortunately netty provided poor guide for FixedChannelPool and maximum "what you can get" is their Unit Tests

我使用的客户端代码示例

Client code that I am using example Source Code

并从此帖子发现,bootstrap.handler(new ChannelInitializer<>())是被ChannelPool

然后我尝试将ChannelInitializer块移至:

public class SomeConnectionPoolHandler implements ChannelPoolHandler {

    private final URI uri = URI.create("ws://some.url:80");

    @Override
    public void channelCreated(Channel ch) {
        String protocol = uri.getScheme();
        if (!"ws".equals(protocol)) {
            throw new IllegalArgumentException("Unsupported protocol: " + protocol);
        }

        DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
        final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, httpHeaders, 1280000);
        final WebSocketClientHandler handler = new WebSocketClientHandler(handshaker, connection);
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpClientCodec(),  new HttpObjectAggregator(65536), handler);
    }

    @Override
    public void channelReleased(Channel ch) {
        //TODO
    }

    @Override
    public void channelAcquired(Channel ch) {
        //TODO
    }
}

现在我的客户代码如下:

And now my client code looks like:

public class Application {
    private final URI uri = URI.create("ws://some.url:80");
    public static void main(String[] args) {
        Channel ch = null;
        try {
            String protocol = uri.getScheme();
            if (!"ws".equals(protocol)) {
                throw new IllegalArgumentException("Unsupported protocol: " + protocol);
            }
            Bootstrap b = new Bootstrap();
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.remoteAddress(uri.getHost(), uri.getPort());

            b.group(group);
            b.channel(NioSocketChannel.class);
            SomeConnectionPoolHandler connectionPoolHandler = new SomeConnectionPoolHandler ();

            channelPool = new FixedChannelPool(b, connectionPoolHandler, 8);
            ch = channelPool.acquire().sync().get();
            ch.writeAndFlush(new TextWebSocketFrame("test")).sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(ch != null) {
                channelPool.release(ch);
            }
        }
    }
}

现在我得到了错误:

java.lang.UnsupportedOperationException: unsupported message type: TextWebSocketFrame (expected: ByteBuf, FileRegion)
    at io.netty.channel.nio.AbstractNioByteChannel.filterOutboundMessage(AbstractNioByteChannel.java:266)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:799)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1291)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.write(CombinedChannelDuplexHandler.java:528)
    at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:101)
    at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
    at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
    at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1089)
    at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1136)
    at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1078)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:745)

推荐答案

好,我找到了解决问题的方法.

Ok, I find solution to my problem.

问题是我试图在WebSocket握手完成之前将数据写入Channel.

The problem is that I was trying to write data to Channel before WebSocket Handshake is completed.

In handler source code you can see that it has method to get ChannelFuture for WebSocket Handshake.

public ChannelFuture handshakeFuture() {
        return handshakeFuture;
    }

现在,从FixedChannelPool获取Channel后,我正在调用method(返回握手是否完成):

Now, after acquiring Channel from FixedChannelPool, I am calling method (returns whether handshake was completed or not):

private boolean isConnectionReady(Channel ch) {
        try {
            WebSocketClientHandler handler = (WebSocketClientHandler) ch.pipeline().get("handler");
            return handler.handshakeFuture().sync().isSuccess();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

还在channelCreated(Channel ch)

DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, httpHeaders, 1280000);
final WebSocketClientHandler handler = new WebSocketClientHandler(handshaker, connection);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpClientCodec(),  new HttpObjectAggregator(65536));
pipeline.addLast("handler", handler);

我知道这段代码现在看起来很糟糕,应该进行重构.

I know that this code now looks awful and should be refactored.

这篇关于如何在WebSockets中使用FixedChannelPool?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆