如何获取服务器响应与netty客户端 [英] How to get server response with netty client
问题描述
我想写一个基于netty的客户端。它应该有方法 public String send(String msg); 它应该返回从服务器或一些未来的响应 - 不重要。它也应该是多线程的。像这样:
public class Client {
public static void main(String [] args)throws InterruptedException {
Client client = new Client();
}
私人频道;
public Client()throws InterruptedException {
EventLoopGroup loopGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer< SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch)throws Exception {
ch.pipeline()。addLast(new StringDecoder())
addLast(new StringEncoder())
addLast(new ClientHandler());
}
} );
channel = b.connect(localhost,9091).sync()。channel();
}
public String sendMessage(String msg){
channel.writeAndFlush(msg);
return ???
}
}
<我不知道如何在我调用writeAndFlush()之后从服务器检索响应;我应该怎么办?
我也使用Netty 4.0.18.Final
返回未来< String>
对于方法很简单,我们将实现以下方法签名:
public Futute< String> sendMessage(String msg){
这是相对容易做,当你知道与异步编程结构。要解决设计问题,我们将执行以下步骤:
-
写邮件时,添加
Promise< String>
至ArrayBlockingQueue< Promise>
消息已发送,并允许我们更改
未来< String>
对象返回结果。 -
当一条消息返回到处理程序时,根据
队列
-
更新
Promise< String>
我们调用
promise.setSuccess()
最终设置对象的状态,这将传播回未来的对象。
示例代码
public class ClientHandler extends SimpleChannelInboundHandler< String> {
private ChannelHandlerContext ctx;
private BlockingQueue< Promise< String>> messageList = new ArrayBlockingQueue<>(16);
@Override
public void channelActive(ChannelHandlerContext ctx){
super.channelActive(ctx);
this.ctx = ctx;
}
@Override
public void channelInactive(ChannelHandlerContext ctx){
super.channelInactive(ctx);
synchronized(this){
Promise< String>舞会;
异常err = null;
while((prom = messageList.poll())!= null)
prom.setFailure(err!= null?err:
err = new IOException(Connection lost));
messageList = null;
}
}
public Future< String> sendMessage(String message){
if(ctx == null)
throw new IllegalStateException();
return sendMessage(message,ctx.newPromise());
}
public Future< String> sendMessage(String message,Promise< String> prom){
synchronized(this){
if(messageList == null){
//连接关闭
prom.setFailure IllegalStateException())
} else if(messageList.offer(prom)){
//连接打开并接受消息
ctx.writeAndFlush(message).addListener();
} else {
//连接打开并且消息被拒绝
prom.setFailure(new BufferOverflowException());
}
return prom;
}
}
@Override
protected void messageReceived(ChannelHandlerContext ctx,String msg){
synchronized(this){
if(messageList! ){
messageList.poll()。setSuccess(msg);
}
}
}
}
细分
-
private ChannelHandlerContext ctx;
用于存储我们对ChannelHandlerContext的引用,我们使用它来创建promises
-
private BlockingQueue< Promise< String>> messageList = new ArrayBlockingQueue<>();
我们将过去的消息保存在此列表中,以便我们可以更改未来的结果 -
public void channelActive(ChannelHandlerContext ctx)
当连接变为活动时由netty调用。在此处初始化我们的变量。
-
public void channelInactive(ChannelHandlerContext ctx)
由于错误或正常连接关闭导致连接失效时由netty调用。 protected void messageReceived(ChannelHandlerContext ctx,String msg)
当新消息到达时调用,这里选择队列头,然后我们在它上面调用setsuccess。
警告建议
当使用futures时,有一件事需要注意,如果未来还没有完成,不要从netty线程调用get(),如果不遵循这个简单的规则,将导致死锁或者 BlockingOperationException
。
I want to write a netty based client. It should have method public String send(String msg); which should return response from the server or some future - doesen't matter. Also it should be multithreaded. Like this:
public class Client {
public static void main(String[] args) throws InterruptedException {
Client client = new Client();
}
private Channel channel;
public Client() throws InterruptedException {
EventLoopGroup loopGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder()).
addLast(new StringEncoder()).
addLast(new ClientHandler());
}
});
channel = b.connect("localhost", 9091).sync().channel();
}
public String sendMessage(String msg) {
channel.writeAndFlush(msg);
return ??????????;
}
}
And I don't get how can I retrieve response from server after I invoke writeAndFlush(); What should I do?
Also I use Netty 4.0.18.Final
Returning a Future<String>
for the method is simple, we are going to implement the following method signature:
public Futute<String> sendMessage(String msg) {
The is relatively easy to do when you are known with the async programming structures. To solve the design problem, we are going to do the following steps:
When a message is written, add a
Promise<String>
to aArrayBlockingQueue<Promise>
This will serve as a list of what messages have recently been send, and allows us to change our
Future<String>
objects return result.When a message arrives back into the handler, resolve it against the head of the
Queue
This allows us to get the correct future to change.
Update the state of the
Promise<String>
We call
promise.setSuccess()
to finally set the state on the object, this will propagate back to the future object.
Example code
public class ClientHandler extends SimpleChannelInboundHandler<String> {
private ChannelHandlerContext ctx;
private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>(16);
@Override
public void channelActive(ChannelHandlerContext ctx) {
super.channelActive(ctx);
this.ctx = ctx;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
super.channelInactive(ctx);
synchronized(this){
Promise<String> prom;
Exception err = null;
while((prom = messageList.poll()) != null)
prom.setFailure(err != null ? err :
err = new IOException("Connection lost"));
messageList = null;
}
}
public Future<String> sendMessage(String message) {
if(ctx == null)
throw new IllegalStateException();
return sendMessage(message, ctx.newPromise());
}
public Future<String> sendMessage(String message, Promise<String> prom) {
synchronized(this){
if(messageList == null) {
// Connection closed
prom.setFailure(new IllegalStateException());
} else if(messageList.offer(prom)) {
// Connection open and message accepted
ctx.writeAndFlush(message).addListener();
} else {
// Connection open and message rejected
prom.setFailure(new BufferOverflowException());
}
return prom;
}
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) {
synchronized(this){
if(messageList != null) {
messageList.poll().setSuccess(msg);
}
}
}
}
Documentation breakdown
private ChannelHandlerContext ctx;
Used to store our reference to the ChannelHandlerContext, we use this so we can create promises
private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>();
We keep the past messages in this list so we can change the result of the future
public void channelActive(ChannelHandlerContext ctx)
Called by netty when the connection becomes active. Init our variables here.
public void channelInactive(ChannelHandlerContext ctx)
Called by netty when the connection becomes inactive, either due to error or normal connection close.
protected void messageReceived(ChannelHandlerContext ctx, String msg)
Called by netty when a new message arrives, here pick out the head of the queue, and then we call setsuccess on it.
Warning advise
When using futures, there is 1 thing you need to lookout for, do not call get() from 1 of the netty threads if the future isn't done yet, failure to follow this simple rule will either result in a deadlock or a BlockingOperationException
.
这篇关于如何获取服务器响应与netty客户端的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!