如何测试绑定到Tokio TcpStream的未来? [英] How can I test a future that is bound to a tokio TcpStream?

查看:170
本文介绍了如何测试绑定到Tokio TcpStream的未来?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个未来,可以使用LinesCodec将TCP流包装在Framed中.

I have a future which wraps a TCP stream in a Framed using the LinesCodec.

当我尝试将其包装在测试中时,我得到未来的阻塞大约有20%的时间,但是由于我没有监听要连接的套接字,因此我希望总是得到错误:

When I try to wrap this in a test, I get the future blocking around 20% of the time, but because I have nothing listening on the socket I'm trying to connect to, I expect to always get the error:

thread 'tokio-runtime-worker-0' panicked at 'error: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }', src/lib.rs:35:24 note: Run with 'RUST_BACKTRACE=1' for a backtrace.

这是我使用的测试代码:

This is the test code I have used:

#[macro_use(try_ready)]
extern crate futures; // 0.1.24
extern crate tokio;   // 0.1.8

use std::io;
use std::net::SocketAddr;
use tokio::codec::{Framed, LinesCodec};
use tokio::net::TcpStream;
use tokio::prelude::*;

struct MyFuture {
    addr: SocketAddr,
}

impl Future for MyFuture {
    type Item = Framed<TcpStream, LinesCodec>;
    type Error = io::Error;
    fn poll(&mut self) -> Result<Async<Framed<TcpStream, LinesCodec>>, io::Error> {
        let strm = try_ready!(TcpStream::connect(&self.addr).poll());
        Ok(Async::Ready(Framed::new(strm, LinesCodec::new())))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::Shutdown;

    #[test]
    fn connect() {
        let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
        let fut = MyFuture { addr: addr }
            .and_then(|f| {
                println!("connected");
                let cn = f.get_ref();
                cn.shutdown(Shutdown::Both)
            }).map_err(|e| panic!("error: {:?}", e));
        tokio::run(fut)
    }
}

游乐场

我已经看到了其他语言中的模式,其中测试二进制文件本身提供了一种异步返回结果的机制,但是还没有找到在Rust中使用类似机制的好方法.

I have seen patterns in other languages where the test binary itself offers a mechanism to return results asynchronously, but haven't found a good way of using a similar mechanism in Rust.

推荐答案

一种简单的测试异步代码的方法可能是为每个测试使用专用的运行时:启动它,等待将来完成,并在运行结束时关闭运行时.测试.

A simple way to test async code may be to use a dedicated runtime for each test: start it, wait for future completion and shutdown the runtime at the end of the test.

#[test]
fn my_case() {

    // setup future f
    // ...

    tokio::run(f);
}

我不知道Rust生态系统中是否已经存在整合模式;参见此讨论关于对基于未来的测试支持的演变代码.

I don't know if there are consolidated patterns already in the Rust ecosystem; see this discussion about the evolution of testing support for future based code.

当您调用poll()时,将查询Future来检查值是否可用.

When you invoke poll(), the future is queried to check if a value is available.

如果没有可用的值,则会记录一个兴趣,以便在发生可以解决未来的事情时再次调用poll().

If a value is not available, an interest is registered so that poll() will be invoked again when something happens that can resolve the future.

调用MyFuture::poll()时:

  1. TcpStream::connect创建新的未来TcpStreamNew
  2. TcpStreamNew::poll仅在将来的第1步创建时被一次调用.
  3. 期货未超出范围,因此,下次调用MyFuture::poll时,您将永远无法解决以前创建的期货.
  1. TcpStream::connect creates a new future TcpStreamNew
  2. TcpStreamNew::poll is invoked immediately only once on the future's creation at step 1.
  3. The future goes out of scope, so the next time you invoke MyFuture::poll you never resolve the previously created futures.

您已经注册了一个未来的权益,如果您在第一次轮询时未解决,则永远不会再询问(轮询)已解决的值或错误.

You have registered an interest for a future that, if not resolved the first time you poll it, you never ask back again (poll) for a resolved value or for an error.

不确定"行为的原因是,第一个poll有时会立即出现ConnectionRefused错误,而立即解决,有时会永远等待将来的连接事件或从未检索到的失败.

The reason of the "nondeterministic" behavior is because the first poll sometimes resolve immediately with a ConnectionRefused error and sometimes it waits forever for a future connection event or a failure that it is never retrieved.

看看Tokio使用的mio::sys::unix::tcp::TcpStream:

Look at mio::sys::unix::tcp::TcpStream used by Tokio:

 impl TcpStream {
     pub fn connect(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
         set_nonblock(stream.as_raw_fd())?;

         match stream.connect(addr) {
             Ok(..) => {}
             Err(ref e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {}
             Err(e) => return Err(e),
         }

         Ok(TcpStream {
             inner: stream,
         })
     }

当您 connect 阻塞套接字,系统调用可能会立即连接/失败或返回EINPROGRESS,在最后一种情况下,必须触发轮询以检索错误的值.

When you connect on a non-blocking socket, the system call may connect/fail immediately or return EINPROGRESS, in this last case a poll must be triggered for retrieving the value of the error.

这篇关于如何测试绑定到Tokio TcpStream的未来?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆