在无限循环中异步将客户端重新连接到服务器 [英] Asynchronously reconnecting a client to a server in an infinite loop
问题描述
我无法创建尝试连接到服务器的客户端,并且:
I'm not able to create a client that tries to connect to a server and:
- 如果服务器关闭,则必须无限循环重试
- 如果服务器已启动且连接成功,则在连接丢失(即服务器断开客户端连接)时,客户端必须重新启动无限循环才能尝试连接到服务器
这是连接服务器的代码;当前,当连接断开时,程序退出.我不确定实现它的最佳方法是什么.也许我必须创建一个无限循环的Future
?
Here's the code to connect to a server; currently when the connection is lost the program exits. I'm not sure what the best way to implement it is; maybe I have to create a Future
with an infinite loop?
extern crate tokio_line;
use tokio_line::LineCodec;
fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
let remote_addr = "127.0.0.1:9876".parse().unwrap();
let tcp = TcpStream::connect(&remote_addr, handle);
let client = tcp.and_then(|stream| {
let (sink, from_server) = stream.framed(LineCodec).split();
let reader = from_server.for_each(|message| {
println!("{}", message);
Ok(())
});
reader.map(|_| {
println!("CLIENT DISCONNECTED");
()
}).map_err(|err| err)
});
let client = client.map_err(|_| { panic!()});
Box::new(client)
}
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = get_connection(&handle);
let client = client.and_then(|c| {
println!("Try to reconnect");
get_connection(&handle);
Ok(())
});
core.run(client).unwrap();
}
添加带有以下内容的tokio-line板条箱:
Add the tokio-line crate with:
tokio-line = { git = "https://github.com/tokio-rs/tokio-line" }
推荐答案
关键问题似乎是:如何使用Tokio实现无限循环?通过回答这个问题,我们可以解决在断开连接时无限重新连接的问题. 根据我编写异步代码的经验,递归似乎是解决此问题的直接方法.
The key question seems to be: how do I implement an infinite loop using Tokio? By answering this question, we can tackle the problem of reconnecting infinitely upon disconnection. From my experience writing asynchronous code, recursion seems to be a straightforward solution to this problem.
更新:正如Shepmaster(以及Tokio Gitter的同事)所指出的那样,我的原始答案泄漏了内存,因为我们建立了随着每次迭代而增长的期货链.下面是一个新的:
UPDATE: as pointed out by Shepmaster (and the folks of the Tokio Gitter), my original answer leaks memory since we build a chain of futures that grows on each iteration. Here follows a new one:
futures
包装箱中有一个功能可以完全满足您的需求.它称为 loop_fn
.您可以通过将主要功能更改为以下内容来使用它:
There is a function in the futures
crate that does exactly what you need. It is called loop_fn
. You can use it by changing your main function to the following:
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = future::loop_fn((), |_| {
// Run the get_connection function and loop again regardless of its result
get_connection(&handle).map(|_| -> Loop<(), ()> {
Loop::Continue(())
})
});
core.run(client).unwrap();
}
该函数类似于for循环,它可以根据get_connection
的结果继续或中断(请参见
The function resembles a for loop, which can continue or break depending on the result of get_connection
(see the documentation for the Loop
enum). In this case, we choose to always continue, so it will infinitely keep reconnecting.
请注意,如果出现错误(例如,如果客户端无法连接到服务器),您的get_connection
版本将出现恐慌.如果您还想在出错后重试,则应删除对panic!
的调用.
Note that your version of get_connection
will panic if there is an error (e.g. if the client cannot connect to the server). If you also want to retry after an error, you should remove the call to panic!
.
如果有人发现它很有趣,请遵循我以前的回答.
Here follows my old answer, in case anyone finds it interesting.
警告:使用下面的代码会导致内存无限增长.
WARNING: using the code below results in unbounded memory growth.
我们希望在每次客户端断开连接时都调用get_connection
函数,这正是我们要做的(请查看reader.and_then
之后的注释):
We want to call the get_connection
function each time the client is disconnected, so that is exactly what we are going to do (look at the comment after reader.and_then
):
fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
let remote_addr = "127.0.0.1:9876".parse().unwrap();
let tcp = TcpStream::connect(&remote_addr, handle);
let handle_clone = handle.clone();
let client = tcp.and_then(|stream| {
let (sink, from_server) = stream.framed(LineCodec).split();
let reader = from_server.for_each(|message| {
println!("{}", message);
Ok(())
});
reader.and_then(move |_| {
println!("CLIENT DISCONNECTED");
// Attempt to reconnect in the future
get_connection(&handle_clone)
})
});
let client = client.map_err(|_| { panic!()});
Box::new(client)
}
请记住,get_connection
是非阻塞的.它只是构造一个Box<Future>
.这意味着当递归调用它时,我们仍然不会阻塞.相反,我们得到了一个新的未来,可以使用and_then
链接到前一个.如您所见,这与普通递归不同,因为堆栈不会在每次迭代时都增长.
Remember that get_connection
is non-blocking. It just constructs a Box<Future>
. This means that when calling it recursively, we still don't block. Instead, we get a new future, which we can link to the previous one by using and_then
. As you can see, this is different to normal recursion since the stack doesn't grow on each iteration.
请注意,我们需要克隆handle
(请参阅handle_clone
),并将其移至传递给reader.and_then
的闭包中.这是必要的,因为闭包的寿命比该函数的寿命长(它将在以后的返回中包含).
Note that we need to clone the handle
(see handle_clone
), and move it into the closure passed to reader.and_then
. This is necessary because the closure is going to live longer than the function (it will be contained in the future we are returning).
您提供的代码无法处理客户端无法连接到服务器的情况(也不存在任何其他错误).按照上面显示的相同原理,我们可以通过将get_connection
的结尾更改为以下内容来处理错误:
The code you provided doesn't handle the case in which the client is unable to connect to the server (nor any other errors). Following the same principle shown above, we can handle errors by changing the end of get_connection
to the following:
let handle_clone = handle.clone();
let client = client.or_else(move |err| {
// Note: this code will infinitely retry, but you could pattern match on the error
// to retry only on certain kinds of error
println!("Error connecting to server: {}", err);
get_connection(&handle_clone)
});
Box::new(client)
请注意,or_else
与and_then
类似,但是它会处理将来产生的错误.
Note that or_else
is like and_then
, but it operates on the error produced by the future.
最后,没有必要在main
函数中使用and_then
.您可以将main
替换为以下代码:
Finally, it is not necessary to use and_then
in the main
function. You can replace your main
by the following code:
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = get_connection(&handle);
core.run(client).unwrap();
}
这篇关于在无限循环中异步将客户端重新连接到服务器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!