如何在 Rust 中彻底打破 tokio-core 事件循环和 futures::Stream [英] How to cleanly break tokio-core event loop and futures::Stream in Rust

查看:55
本文介绍了如何在 Rust 中彻底打破 tokio-core 事件循环和 futures::Stream的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在涉足 tokio-core,可以弄清楚如何生成事件循环.但是,有两件事我不确定 - 如何优雅地退出事件循环以及如何退出在事件循环内运行的流.例如,考虑这段简单的代码,它在事件循环中生成两个侦听器并等待另一个线程指示退出条件:

I am dabbling in tokio-core and can figure out how to spawn an event loop. However there are two things i am not sure of - how to gracefully exit the event loop and how to exit a stream running inside an event loop. For e.g consider this simple piece of code which spawns two listeners into the event loop and waits for another thread to indicate an exit condition:

extern crate tokio_core;
extern crate futures;

use tokio_core::reactor::Core;
use futures::sync::mpsc::unbounded;
use tokio_core::net::TcpListener;
use std::net::SocketAddr;
use std::str::FromStr;
use futures::{Stream, Future};
use std::thread;
use std::time::Duration;
use std::sync::mpsc::channel;

fn main() {
    let (get_tx, get_rx) = channel();

    let j = thread::spawn(move || {
        let mut core = Core::new().unwrap();
        let (tx, rx) = unbounded();
        get_tx.send(tx).unwrap(); // <<<<<<<<<<<<<<< (1)

        // Listener-0
        {
            let l = TcpListener::bind(&SocketAddr::from_str("127.0.0.1:44444").unwrap(),
                                      &core.handle())
                .unwrap();

            let fe = l.incoming()
                .for_each(|(_sock, peer)| {
                    println!("Accepted from {}", peer);
                    Ok(())
                })
                .map_err(|e| println!("----- {:?}", e));

            core.handle().spawn(fe);
        }

        // Listener1
        {
            let l = TcpListener::bind(&SocketAddr::from_str("127.0.0.1:55555").unwrap(),
                                      &core.handle())
                .unwrap();

            let fe = l.incoming()
                .for_each(|(_sock, peer)| {
                    println!("Accepted from {}", peer);
                    Ok(())
                })
                .map_err(|e| println!("----- {:?}", e));

            core.handle().spawn(fe);
        }

        let work = rx.for_each(|v| {
            if v {
                // (3) I want to shut down listener-0 above the release the resources
                Ok(())
            } else {
                Err(()) // <<<<<<<<<<<<<<< (2)

            }
        });

        let _ = core.run(work);
        println!("Exiting event loop thread");
    });

    let tx = get_rx.recv().unwrap();

    thread::sleep(Duration::from_secs(2));
    println!("Want to terminate listener-0"); // <<<<<< (3)
    tx.send(true).unwrap();

    thread::sleep(Duration::from_secs(2));
    println!("Want to exit event loop");
    tx.send(false).unwrap();

    j.join().unwrap();
}

所以说在主线程中休眠之后,我想要事件循环线程的干净退出.目前我向事件循环发送一些东西以使其退出,从而释放线程.

So say after the sleep in the main thread i want a clean exit of the event loop thread. Currently I send something to the event loop to make it exit and thus releasing the thread.

然而,(1)(2) 都感觉很糟糕 - 我强迫一个错误作为退出条件.我的问题是:

However both, (1) and (2) feel hacky - i am forcing an error as an exit condition. My questions are:

1) 我做对了吗?如果不是,那么优雅地退出事件循环线程的正确方法是什么.

1) Am I doing it right ? If not then what is the correct way to gracefully exit the event loop thread.

2) 我不知道该怎么做 (3) - 即在外部指示关闭 listener-0 的条件并释放它的所有资源.我如何实现这一目标?

2) I don't event know how to do (3) - i.e. indicate a condition externally to shutdown listener-0 and free all it's resources. How do i achieve this ?

推荐答案

不再打开事件循环 (core)(例如通过 run())或被遗忘(drop()ed).没有同步退出.core.run() 返回并在传递给它的 Future 完成时停止循环.

The event loop (core) is not being turned any more (e.g. by run()) or is forgotten (drop()ed). There is no synchronous exit. core.run() returns and stops turning the loop when the Future passed to it completes.

A Stream 通过产生 None 完成(在下面的代码中用 (3) 标记).当例如TCP 连接关闭,Stream 表示它完成,反之亦然.

A Stream completes by yielding None (marked with (3) in the code below). When e.g. a TCP connection is closed the Stream representing it completes and the other way around.

extern crate tokio_core;
extern crate futures;

use tokio_core::reactor::Core;
use futures::sync::mpsc::unbounded;
use tokio_core::net::TcpListener;
use std::net::SocketAddr;
use std::str::FromStr;
use futures::{Async, Stream, Future, Poll};
use std::thread;
use std::time::Duration;

struct CompletionPact<S, C>
    where S: Stream,
          C: Stream, 
{
    stream: S,
    completer: C,
}

fn stream_completion_pact<S, C>(s: S, c: C) -> CompletionPact<S, C>
    where S: Stream,
          C: Stream,
{
    CompletionPact {
        stream: s,
        completer: c,
    }
}

impl<S, C> Stream for CompletionPact<S, C>
    where S: Stream,
          C: Stream,
{
    type Item = S::Item;
    type Error = S::Error;

    fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
        match self.completer.poll() {
            Ok(Async::Ready(None)) |
            Err(_) |
            Ok(Async::Ready(Some(_))) => {
                // We are done, forget us
                Ok(Async::Ready(None)) // <<<<<< (3)
            },
            Ok(Async::NotReady) => {
                self.stream.poll()
            },
        }
    }
}

fn main() {
    // unbounded() is the equivalent of a Stream made from a channel()
    // directly create it in this thread instead of receiving a Sender
    let (tx, rx) = unbounded::<()>();
    // A second one to cause forgetting the listener
    let (l0tx, l0rx) = unbounded::<()>();

    let j = thread::spawn(move || {
        let mut core = Core::new().unwrap();

        // Listener-0
        {
            let l = TcpListener::bind(
                    &SocketAddr::from_str("127.0.0.1:44444").unwrap(),
                    &core.handle())
                .unwrap();

            // wrap the Stream of incoming connections (which usually doesn't
            // complete) into a Stream that completes when the
            // other side is drop()ed or sent on
            let fe = stream_completion_pact(l.incoming(), l0rx)
                .for_each(|(_sock, peer)| {
                    println!("Accepted from {}", peer);
                    Ok(())
                })
                .map_err(|e| println!("----- {:?}", e));

            core.handle().spawn(fe);
        }

        // Listener1
        {
            let l = TcpListener::bind(
                    &SocketAddr::from_str("127.0.0.1:55555").unwrap(),
                    &core.handle())
                .unwrap();

            let fe = l.incoming()
                .for_each(|(_sock, peer)| {
                    println!("Accepted from {}", peer);
                    Ok(())
                })
                .map_err(|e| println!("----- {:?}", e));

            core.handle().spawn(fe);
        }

        let _ = core.run(rx.into_future());
        println!("Exiting event loop thread");
    });

    thread::sleep(Duration::from_secs(2));
    println!("Want to terminate listener-0");
    // A drop() will result in the rx side Stream being completed,
    // which is indicated by Ok(Async::Ready(None)).
    // Our wrapper behaves the same when something is received.
    // When the event loop encounters a
    // Stream that is complete it forgets about it. Which propagates to a
    // drop() that close()es the file descriptor, which closes the port if
    // nothing else uses it.
    l0tx.send(()).unwrap(); // alternatively: drop(l0tx);
    // Note that this is async and is only the signal
    // that starts the forgetting.

    thread::sleep(Duration::from_secs(2));
    println!("Want to exit event loop");
    // Same concept. The reception or drop() will cause Stream completion.
    // A completed Future will cause run() to return.
    tx.send(()).unwrap();

    j.join().unwrap();
}

这篇关于如何在 Rust 中彻底打破 tokio-core 事件循环和 futures::Stream的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆