如何在WebSockets中使用FixedChannelPool? [英] How to use FixedChannelPool with WebSockets?
问题描述
您好,其他开发人员...
Hello fellow developers...
我正在尝试实现WebSocket固定连接池,不幸的是netty为FixedChannelPool
提供的指南很差,最大的所能获得的"是他们的
I am trying to implement WebSocket fixed connection pool, unfortunately netty provided poor guide for FixedChannelPool
and maximum "what you can get" is their Unit Tests
Client code that I am using example Source Code
并从此帖子发现,bootstrap.handler(new ChannelInitializer<>())
是被ChannelPool
然后我尝试将ChannelInitializer
块移至:
public class SomeConnectionPoolHandler implements ChannelPoolHandler {
private final URI uri = URI.create("ws://some.url:80");
@Override
public void channelCreated(Channel ch) {
String protocol = uri.getScheme();
if (!"ws".equals(protocol)) {
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
}
DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, httpHeaders, 1280000);
final WebSocketClientHandler handler = new WebSocketClientHandler(handshaker, connection);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpClientCodec(), new HttpObjectAggregator(65536), handler);
}
@Override
public void channelReleased(Channel ch) {
//TODO
}
@Override
public void channelAcquired(Channel ch) {
//TODO
}
}
现在我的客户代码如下:
And now my client code looks like:
public class Application {
private final URI uri = URI.create("ws://some.url:80");
public static void main(String[] args) {
Channel ch = null;
try {
String protocol = uri.getScheme();
if (!"ws".equals(protocol)) {
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
}
Bootstrap b = new Bootstrap();
b.option(ChannelOption.SO_KEEPALIVE, true);
b.remoteAddress(uri.getHost(), uri.getPort());
b.group(group);
b.channel(NioSocketChannel.class);
SomeConnectionPoolHandler connectionPoolHandler = new SomeConnectionPoolHandler ();
channelPool = new FixedChannelPool(b, connectionPoolHandler, 8);
ch = channelPool.acquire().sync().get();
ch.writeAndFlush(new TextWebSocketFrame("test")).sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(ch != null) {
channelPool.release(ch);
}
}
}
}
现在我得到了错误:
java.lang.UnsupportedOperationException: unsupported message type: TextWebSocketFrame (expected: ByteBuf, FileRegion)
at io.netty.channel.nio.AbstractNioByteChannel.filterOutboundMessage(AbstractNioByteChannel.java:266)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:799)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1291)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.write(CombinedChannelDuplexHandler.java:528)
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:101)
at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:348)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1089)
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1136)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1078)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
推荐答案
好,我找到了解决问题的方法.
Ok, I find solution to my problem.
问题是我试图在WebSocket握手完成之前将数据写入Channel
.
The problem is that I was trying to write data to Channel
before WebSocket Handshake is completed.
在处理程序源代码,您会看到它具有为WebSocket握手获取ChannelFuture
的方法.
In handler source code you can see that it has method to get ChannelFuture
for WebSocket Handshake.
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
现在,从FixedChannelPool
获取Channel
后,我正在调用method(返回握手是否完成):
Now, after acquiring Channel
from FixedChannelPool
, I am calling method (returns whether handshake was completed or not):
private boolean isConnectionReady(Channel ch) {
try {
WebSocketClientHandler handler = (WebSocketClientHandler) ch.pipeline().get("handler");
return handler.handshakeFuture().sync().isSuccess();
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
还在channelCreated(Channel ch)
DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, httpHeaders, 1280000);
final WebSocketClientHandler handler = new WebSocketClientHandler(handshaker, connection);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpClientCodec(), new HttpObjectAggregator(65536));
pipeline.addLast("handler", handler);
我知道这段代码现在看起来很糟糕,应该进行重构.
I know that this code now looks awful and should be refactored.
这篇关于如何在WebSockets中使用FixedChannelPool?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!