如何停止从tokio :: io :: lines流阅读? [英] How can I stop reading from a tokio::io::lines stream?

查看:122
本文介绍了如何停止从tokio :: io :: lines流阅读?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想终止从tokio::io::lines流中读取.我将其与oneshot的将来版本合并并终止了它,但是tokio::run仍在工作.

I want to terminate reading from a tokio::io::lines stream. I merged it with a oneshot future and terminated it, but tokio::run was still working.

use futures::{sync::oneshot, *}; // 0.1.27
use std::{io::BufReader, time::Duration};
use tokio::prelude::*; // 0.1.21

fn main() {
    let (tx, rx) = oneshot::channel::<()>();
    let lines = tokio::io::lines(BufReader::new(tokio::io::stdin()));
    let lines = lines.for_each(|item| {
        println!("> {:?}", item);
        Ok(())
    });

    std::thread::spawn(move || {
        std::thread::sleep(Duration::from_millis(5000));
        println!("system shutting down");
        let _ = tx.send(());
    });

    let lines = lines.select2(rx);

    tokio::run(lines.map(|_| ()).map_err(|_| ()));
}

如何停止阅读?

推荐答案

您的策略没有错,但仅适用于不通过Tokio的blocking执行传统封锁操作的期货将来永远都不要做).

There's nothing wrong with your strategy, but it will only work with futures that don't execute a blocking operation via Tokio's blocking (the traditional kind of blocking should never be done inside a future).

您可以通过将tokio::io::lines(..) future替换为简单间隔future来进行测试:

You can test this by replacing the tokio::io::lines(..) future with a simple interval future:

let lines = Interval::new(Instant::now(), Duration::from_secs(1));


问题是tokio::io::Stdin在内部使用tokio_threadpool::blocking.


The problem is that tokio::io::Stdin internally uses tokio_threadpool::blocking .

当您使用 Tokio线程池阻止时(强调我的):

When you use Tokio thread pool blocking (emphasis mine):

NB:每当调用阻止"的整个任务被阻止时, 提供的封闭块,即使您使用了将来的组合器select -此任务中的其他期货将不会取得进展 直到关闭返回.如果不希望这样做,请确保 blocking runs in its自己的任务(例如,使用 futures::sync::oneshot::spawn).

NB: The entire task that called blocking is blocked whenever the supplied closure blocks, even if you have used future combinators such as select - the other futures in this task will not make progress until the closure returns. If this is not desired, ensure that blocking runs in its own task (e.g. using futures::sync::oneshot::spawn).

由于这将阻塞组合器中的所有其他将来,因此您的Receiver在阻塞结束之前将无法从Sender获得信号.

Since this will block every other future in the combinator, your Receiver will not be able to get a signal from the Senderuntil the blocking ends.

请参阅如何从stdin中读取非阻止?,或者您可以使用 tokio-stdin-stdout 来创建使用stdin线程中的数据.它还有一个逐行示例.

Please see How can I read non-blocking from stdin? or you can use tokio-stdin-stdout, which creates a channel to consume data from stdin thread. It also has a line-by-line example.

这篇关于如何停止从tokio :: io :: lines流阅读?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆