如何在其他类中读取netty中的消息 [英] How to read Message in netty in other class

查看:390
本文介绍了如何在其他类中读取netty中的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在 InboundHandler 以外的类中的特定位置读取消息。我无法在 channelRead0 方法中找到一种方法来读取它,该方法是从netty框架调用的。

I want to read a message at a specific position in an class other than InboundHandler. I can't find a way to read it expect in the channelRead0 method, which is called from the netty framework.

例如:

context.writeMessage("message");
String msg = context.readMessage;

如果无法做到这一点,我如何映射结果,我在<$ c中得到的结果$ c> channelRead0 我在另一个类中进行的特定调用的方法?

If this is not possible, how can I map a result, which I get in the channelRead0 method to a specific call I made in another class?

推荐答案

Netty框架旨在异步驱动。使用这个类比,它可以处理大量的连接,并且线程使用最少。我正在创建一个使用netty框架将呼叫分派到远程位置的api,你应该对你的呼叫使用相同的类比。

The Netty framework is designed to be asynchronously driven. Using this analogy, it can handle large amount of connections with minimal threading usage. I you are creating an api that uses the netty framework to dispatch calls to a remote location, you should use the same analogy for your calls.

而不是让你的api返回值为direct,使其返回 未来<?> 承诺<?> 。在您的应用程序中有不同的方法来实现此系统,最简单的方法是创建一个自定义处理程序,将传入的请求映射到FIFO队列中的 Promise

Instead of making your api return the value direct, make it return a Future<?> or a Promise<?>. There are different ways of implementing this system in your application, the simplest way is creating a custom handler that maps the incoming requests to the Promises in a FIFO queue.

这方面的一个例子如下:

An example of this could be the following:

这主要基于我在过去提交的这个答案。

This is heavily based on this answer that I submitted in the past.

我们先从开始将请求映射到我们管道中的请求的处理程序:

We start with out handler that maps the requests to requests in our pipeline:

public class MyLastHandler extends SimpleInboundHandler<String> {
    private final SynchronousQueue<Promise<String>> queue;

    public MyLastHandler (SynchronousQueue<Promise<String>> queue) {
        super();
        this.queue = queue;
    }

    // The following is called messageReceived(ChannelHandlerContext, String) in 5.0.
    @Override
    public void channelRead0(ChannelHandlerContext ctx, String msg) {
        this.queue.remove().setSuccss(msg); 
        // Or setFailure(Throwable)
    }
}

然后我们需要一种将命令发送到远程服务器的方法:

We then need to have a method of sending the commands to a remote server:

Channel channel = ....;
SynchronousQueue<Promise<String>> queue = ....;

public Future<String> sendCommandAsync(String command) {
    return sendCommandAsync(command, new DefaultPromise<>());
}

public Future<String> sendCommandAsync(String command, Promise<String> promise) {
    synchronized(channel) {
        queue.offer(promise);
        channel.write(command);
    }
    channel.flush();
}

在我们完成方法后,我们需要一种方法来调用它:

After we have done our methods, we need a way to call it:

sendCommandAsync("USER anonymous", 
    new DefaultPromise<>().addListener(
        (Future<String> f) -> {
            String response = f.get();
            if (response.startWidth("331")) {
                // do something
            }
            // etc
        }
    )
);

如果被叫方希望使用我们的api作为阻止呼叫,他也可以这样做:

If the called would like to use our a api as a blocking call, he can also do that:

String response = sendCommandAsync("USER anonymous").get();
if (response.startWidth("331")) {
    // do something
}
// etc

请注意 Future.get() 可以抛出 InterruptedException 如果线程状态被中断,不像套接字读取操作,只能通过套接字上的某些交互来取消。此异常不应该是 FutureListener 中的问题。

这篇关于如何在其他类中读取netty中的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆