如何获取服务器响应与netty客户端 [英] How to get server response with netty client

查看:289
本文介绍了如何获取服务器响应与netty客户端的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想写一个基于netty的客户端。它应该有方法 public String send(String msg); 它应该返回从服务器或一些未来的响应 - 不重要。它也应该是多线程的。像这样:

  public class Client {
public static void main(String [] args)throws InterruptedException {
Client client = new Client();

}

私人频道;

public Client()throws InterruptedException {
EventLoopGroup loopGroup = new NioEventLoopGroup();

Bootstrap b = new Bootstrap();
b.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer< SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch)throws Exception {
ch.pipeline()。addLast(new StringDecoder())
addLast(new StringEncoder())
addLast(new ClientHandler());
}
} );
channel = b.connect(localhost,9091).sync()。channel();
}

public String sendMessage(String msg){
channel.writeAndFlush(msg);
return ???
}

}



<我不知道如何在我调用writeAndFlush()之后从服务器检索响应;我应该怎么办?



我也使用Netty 4.0.18.Final

解决方案>

返回未来< String> 对于方法很简单,我们将实现以下方法签名:

  public Futute< String> sendMessage(String msg){

这是相对容易做,当你知道与异步编程结构。要解决设计问题,我们将执行以下步骤:


  1. 写邮件时,添加 Promise< String> ArrayBlockingQueue< Promise>



    消息已发送,并允许我们更改未来< String> 对象返回结果。


  2. 当一条消息返回到处理程序时,根据队列




  3. 更新 Promise< String>



    我们调用 promise.setSuccess()最终设置对象的状态,这将传播回未来的对象。




示例代码



  public class ClientHandler extends SimpleChannelInboundHandler< String> {
private ChannelHandlerContext ctx;
private BlockingQueue< Promise< String>> messageList = new ArrayBlockingQueue<>(16);

@Override
public void channelActive(ChannelHandlerContext ctx){
super.channelActive(ctx);
this.ctx = ctx;
}

@Override
public void channelInactive(ChannelHandlerContext ctx){
super.channelInactive(ctx);
synchronized(this){
Promise< String>舞会;
异常err = null;
while((prom = messageList.poll())!= null)
prom.setFailure(err!= null?err:
err = new IOException(Connection lost));
messageList = null;
}
}

public Future< String> sendMessage(String message){
if(ctx == null)
throw new IllegalStateException();
return sendMessage(message,ctx.newPromise());
}

public Future< String> sendMessage(String message,Promise< String> prom){
synchronized(this){
if(messageList == null){
//连接关闭
prom.setFailure IllegalStateException())
} else if(messageList.offer(prom)){
//连接打开并接受消息
ctx.writeAndFlush(message).addListener();
} else {
//连接打开并且消息被拒绝
prom.setFailure(new BufferOverflowException());
}
return prom;
}
}
@Override
protected void messageReceived(ChannelHandlerContext ctx,String msg){
synchronized(this){
if(messageList! ){
messageList.poll()。setSuccess(msg);
}
}
}
}



细分




  • private ChannelHandlerContext ctx;



    用于存储我们对ChannelHandlerContext的引用,我们使用它来创建promises


  • private BlockingQueue< Promise< String>> messageList = new ArrayBlockingQueue<>();



    我们将过去的消息保存在此列表中,以便我们可以更改未来的结果


  • public void channelActive(ChannelHandlerContext ctx)



    当连接变为活动时由netty调用。在此处初始化我们的变量。


  • public void channelInactive(ChannelHandlerContext ctx)

    $ b $


  • 由于错误或正常连接关闭导致连接失效时由netty调用。 protected void messageReceived(ChannelHandlerContext ctx,String msg)



    当新消息到达时调用,这里选择队列头,然后我们在它上面调用setsuccess。




警告建议



当使用futures时,有一件事需要注意,如果未来还没有完成,不要从netty线程调用get(),如果不遵循这个简单的规则,将导致死锁或者 BlockingOperationException


I want to write a netty based client. It should have method public String send(String msg); which should return response from the server or some future - doesen't matter. Also it should be multithreaded. Like this:

public class Client {
public static void main(String[] args) throws InterruptedException {
    Client client = new Client();

}

private Channel channel;

public Client() throws InterruptedException {
    EventLoopGroup loopGroup = new NioEventLoopGroup();

    Bootstrap b = new Bootstrap();
    b.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringDecoder()).
                    addLast(new StringEncoder()).
                    addLast(new ClientHandler());
        }
    });
    channel = b.connect("localhost", 9091).sync().channel();
}

public String sendMessage(String msg) {
    channel.writeAndFlush(msg);
    return ??????????;
}

}

And I don't get how can I retrieve response from server after I invoke writeAndFlush(); What should I do?

Also I use Netty 4.0.18.Final

解决方案

Returning a Future<String> for the method is simple, we are going to implement the following method signature:

public Futute<String> sendMessage(String msg) {

The is relatively easy to do when you are known with the async programming structures. To solve the design problem, we are going to do the following steps:

  1. When a message is written, add a Promise<String> to a ArrayBlockingQueue<Promise>

    This will serve as a list of what messages have recently been send, and allows us to change our Future<String> objects return result.

  2. When a message arrives back into the handler, resolve it against the head of the Queue

    This allows us to get the correct future to change.

  3. Update the state of the Promise<String>

    We call promise.setSuccess() to finally set the state on the object, this will propagate back to the future object.

Example code

public class ClientHandler extends SimpleChannelInboundHandler<String> {
    private ChannelHandlerContext ctx;
    private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>(16);

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        super.channelActive(ctx);
        this.ctx = ctx;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        super.channelInactive(ctx);
        synchronized(this){
            Promise<String> prom;
            Exception err = null;
            while((prom = messageList.poll()) != null) 
                prom.setFailure(err != null ? err : 
                    err = new IOException("Connection lost"));
            messageList = null;
        }
    }

    public Future<String> sendMessage(String message) {
        if(ctx == null) 
            throw new IllegalStateException();
        return sendMessage(message, ctx.newPromise());
    }

    public Future<String> sendMessage(String message, Promise<String> prom) {
        synchronized(this){
            if(messageList == null) {
                // Connection closed
                prom.setFailure(new IllegalStateException());
            } else if(messageList.offer(prom)) { 
                // Connection open and message accepted
                ctx.writeAndFlush(message).addListener();
            } else { 
                // Connection open and message rejected
                prom.setFailure(new BufferOverflowException());
            }
            return prom;
        }
    }
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) {
        synchronized(this){
            if(messageList != null) {
                 messageList.poll().setSuccess(msg);
            }
        }
    }
}

Documentation breakdown

  • private ChannelHandlerContext ctx;

    Used to store our reference to the ChannelHandlerContext, we use this so we can create promises

  • private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>();

    We keep the past messages in this list so we can change the result of the future

  • public void channelActive(ChannelHandlerContext ctx)

    Called by netty when the connection becomes active. Init our variables here.

  • public void channelInactive(ChannelHandlerContext ctx)

    Called by netty when the connection becomes inactive, either due to error or normal connection close.

  • protected void messageReceived(ChannelHandlerContext ctx, String msg)

    Called by netty when a new message arrives, here pick out the head of the queue, and then we call setsuccess on it.

Warning advise

When using futures, there is 1 thing you need to lookout for, do not call get() from 1 of the netty threads if the future isn't done yet, failure to follow this simple rule will either result in a deadlock or a BlockingOperationException.

这篇关于如何获取服务器响应与netty客户端的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆