在无限循环中异步将客户端重新连接到服务器 [英] Asynchronously reconnecting a client to a server in an infinite loop

查看:102
本文介绍了在无限循环中异步将客户端重新连接到服务器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我无法创建尝试连接到服务器的客户端,并且:

I'm not able to create a client that tries to connect to a server and:

  • 如果服务器关闭,则必须无限循环重试
  • 如果服务器已启动且连接成功,则在连接丢失(即服务器断开客户端连接)时,客户端必须重新启动无限循环才能尝试连接到服务器

这是连接服务器的代码;当前,当连接断开时,程序退出.我不确定实现它的最佳方法是什么.也许我必须创建一个无限循环的Future?

Here's the code to connect to a server; currently when the connection is lost the program exits. I'm not sure what the best way to implement it is; maybe I have to create a Future with an infinite loop?

extern crate tokio_line;
use tokio_line::LineCodec;

fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {                                                                                                                                   
    let remote_addr = "127.0.0.1:9876".parse().unwrap();                                                                                                                                                            
    let tcp = TcpStream::connect(&remote_addr, handle);                                                                                                                                                             

    let client = tcp.and_then(|stream| {                                                                                                                                                                            
        let (sink, from_server) = stream.framed(LineCodec).split();                                                                                                                                                 
        let reader = from_server.for_each(|message| {                                                                                                                                                               
            println!("{}", message);                                                                                                                                                                                
            Ok(())                                                                                                                                                                                                  
        });                                                                                                                                                                                                         

        reader.map(|_| {                                                                                                                                                                                            
            println!("CLIENT DISCONNECTED");                                                                                                                                                                        
            ()                                                                                                                                                                                                      
        }).map_err(|err| err)                                                                                                                                                                                       
    });                                                                                                                                                                                                             

    let client = client.map_err(|_| { panic!()});                                                                                                                                                                   
    Box::new(client)                                                                                                                                                                                                
}                                                                                                                                                                                                                   

fn main() {                                                                                                                                                                                                         
    let mut core = Core::new().unwrap();                                                                                                                                                                            
    let handle = core.handle();                                                                                                                                                                                     
    let client = get_connection(&handle);                                                                                                                                                                           

    let client = client.and_then(|c| {                                                                                                                                                                              
        println!("Try to reconnect");                                                                                                                                                                               
        get_connection(&handle);                                                                                                                                                                                    
        Ok(())                                                                                                                                                                                                      
    });                                                                                                                                                                                                             

    core.run(client).unwrap();                                                                                                                                                                                      
}

添加带有以下内容的tokio-line板条箱:

Add the tokio-line crate with:

tokio-line = { git = "https://github.com/tokio-rs/tokio-line" }

推荐答案

关键问题似乎是:如何使用Tokio实现无限循环?通过回答这个问题,我们可以解决在断开连接时无限重新连接的问题. 根据我编写异步代码的经验,递归似乎是解决此问题的直接方法.

The key question seems to be: how do I implement an infinite loop using Tokio? By answering this question, we can tackle the problem of reconnecting infinitely upon disconnection. From my experience writing asynchronous code, recursion seems to be a straightforward solution to this problem.

更新:正如Shepmaster(以及Tokio Gitter的同事)所指出的那样,我的原始答案泄漏了内存,因为我们建立了随着每次迭代而增长的期货链.下面是一个新的:

UPDATE: as pointed out by Shepmaster (and the folks of the Tokio Gitter), my original answer leaks memory since we build a chain of futures that grows on each iteration. Here follows a new one:

futures包装箱中有一个功能可以完全满足您的需求.它称为 loop_fn .您可以通过将主要功能更改为以下内容来使用它:

There is a function in the futures crate that does exactly what you need. It is called loop_fn. You can use it by changing your main function to the following:

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let client = future::loop_fn((), |_| {
        // Run the get_connection function and loop again regardless of its result
        get_connection(&handle).map(|_| -> Loop<(), ()> {
            Loop::Continue(())
        })
    });

    core.run(client).unwrap();
}

该函数类似于for循环,它可以根据get_connection的结果继续或中断(请参见

The function resembles a for loop, which can continue or break depending on the result of get_connection (see the documentation for the Loop enum). In this case, we choose to always continue, so it will infinitely keep reconnecting.

请注意,如果出现错误(例如,如果客户端无法连接到服务器),您的get_connection版本将出现恐慌.如果您还想在出错后重试,则应删除对panic!的调用.

Note that your version of get_connection will panic if there is an error (e.g. if the client cannot connect to the server). If you also want to retry after an error, you should remove the call to panic!.

如果有人发现它很有趣,请遵循我以前的回答.

Here follows my old answer, in case anyone finds it interesting.

警告:使用下面的代码会导致内存无限增长.

WARNING: using the code below results in unbounded memory growth.

我们希望在每次客户端断开连接时都调用get_connection函数,这正是我们要做的(请查看reader.and_then之后的注释):

We want to call the get_connection function each time the client is disconnected, so that is exactly what we are going to do (look at the comment after reader.and_then):

fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
    let remote_addr = "127.0.0.1:9876".parse().unwrap();
    let tcp = TcpStream::connect(&remote_addr, handle);
    let handle_clone = handle.clone();

    let client = tcp.and_then(|stream| {
        let (sink, from_server) = stream.framed(LineCodec).split();
        let reader = from_server.for_each(|message| {
            println!("{}", message);
            Ok(())
        });

        reader.and_then(move |_| {
            println!("CLIENT DISCONNECTED");
            // Attempt to reconnect in the future
            get_connection(&handle_clone)
        })
    });

    let client = client.map_err(|_| { panic!()});
    Box::new(client)
}

请记住,get_connection是非阻塞的.它只是构造一个Box<Future>.这意味着当递归调用它时,我们仍然不会阻塞.相反,我们得到了一个新的未来,可以使用and_then链接到前一个.如您所见,这与普通递归不同,因为堆栈不会在每次迭代时都增长.

Remember that get_connection is non-blocking. It just constructs a Box<Future>. This means that when calling it recursively, we still don't block. Instead, we get a new future, which we can link to the previous one by using and_then. As you can see, this is different to normal recursion since the stack doesn't grow on each iteration.

请注意,我们需要克隆handle(请参阅handle_clone),并将其移至传递给reader.and_then的闭包中.这是必要的,因为闭包的寿命比该函数的寿命长(它将在以后的返回中包含).

Note that we need to clone the handle (see handle_clone), and move it into the closure passed to reader.and_then. This is necessary because the closure is going to live longer than the function (it will be contained in the future we are returning).

您提供的代码无法处理客户端无法连接到服务器的情况(也不存在任何其他错误).按照上面显示的相同原理,我们可以通过将get_connection的结尾更改为以下内容来处理错误:

The code you provided doesn't handle the case in which the client is unable to connect to the server (nor any other errors). Following the same principle shown above, we can handle errors by changing the end of get_connection to the following:

let handle_clone = handle.clone();
let client = client.or_else(move |err| {
    // Note: this code will infinitely retry, but you could pattern match on the error
    // to retry only on certain kinds of error
    println!("Error connecting to server: {}", err);
    get_connection(&handle_clone)
});
Box::new(client)

请注意,or_elseand_then类似,但是它会处理将来产生的错误.

Note that or_else is like and_then, but it operates on the error produced by the future.

最后,没有必要在main函数中使用and_then.您可以将main替换为以下代码:

Finally, it is not necessary to use and_then in the main function. You can replace your main by the following code:

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let client = get_connection(&handle);
    core.run(client).unwrap();
}

这篇关于在无限循环中异步将客户端重新连接到服务器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆