在 future-rs 中封装阻塞 I/O 的最佳方法是什么? [英] What is the best approach to encapsulate blocking I/O in future-rs?

查看:21
本文介绍了在 future-rs 中封装阻塞 I/O 的最佳方法是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我阅读了 tokio 文档,我想知道封装的最佳方法是什么将来昂贵的同步 I/O.

I read the tokio documentation and I wonder what is the best approach for encapsulating costly synchronous I/O in a future.

通过反应器框架,我们获得了绿色线程模型的优势:几个操作系统线程通过一个执行器处理大量并发任务.

With the reactor framework, we get the advantage of a green threading model: a few OS threads handle a lot of concurrent tasks through an executor.

tokio 的未来模型是需求驱动的,这意味着未来本身将轮询其内部状态以提供有关其完成的信息;允许背压和取消功能.据我了解,未来的轮询阶段必须是非阻塞的才能正常工作.

The future model of tokio is demand driven, which means the future itself will poll its internal state to provide informations about its completion; allowing backpressure and cancellation capabilities. As far as I understand, the polling phase of the future must be non-blocking to work well.

我想要封装的 I/O 可以被看作是一个长时间的原子性和昂贵的操作.理想情况下,一个独立的任务将执行 I/O,相关的 Future 将轮询 I/O 线程以获取完成状态.

The I/O I want to encapsulate can be seen as a long atomic and costly operation. Ideally, an independent task would perform the I/O and the associated future would poll the I/O thread for the completion status.

我看到的仅有的两个选项是:

The two only options I see are:

  • 在未来的 poll 函数中包含阻塞 I/O.
  • 产生一个操作系统线程来执行 I/O 并使用未来机制来轮询其状态,如 显示在文档中
  • Include the blocking I/O in the poll function of the future.
  • spawn an OS thread to perform the I/O and use the future mechanism to poll its state, as shown in the documentation

据我所知,这两种解决方案都不是最佳的,也没有获得绿色线程模型的全部优势(首先在文档中不建议,其次不要通过反应堆框架提供的执行器).还有其他解决方案吗?

As I understand it, neither solution is optimal and don't get the full advantage of the green-threading model (first is not advised in documentation and second don't pass through the executor provided by reactor framework). Is there another solution?

推荐答案

理想情况下,一个独立的任务将执行 I/O,相关的 Future 将轮询 I/O 线程以获取完成状态.

Ideally, an independent task would perform the I/O and the associated future would poll the I/O thread for the completion status.

是的,这是异步执行的推荐方法.请注意,这不限于 I/O,而是适用于任何长时间运行的同步任务!

Yes, this is the recommended approach for asynchronous execution. Note that this is not restricted to I/O, but is valid for any long-running synchronous task!

ThreadPool 类型是为此创建的1.

在这种情况下,您生成要在池中运行的工作.池本身执行工作以检查工作是否已完成并返回满足 Future 特征的类型.

In this case, you spawn work to run in the pool. The pool itself performs the work to check to see if the work is completed yet and returns a type that fulfills the Future trait.

use futures::{
    executor::{self, ThreadPool},
    future,
    task::{SpawnError, SpawnExt},
}; // 0.3.1, features = ["thread-pool"]
use std::{thread, time::Duration};

async fn delay_for(pool: &ThreadPool, seconds: u64) -> Result<u64, SpawnError> {
    pool.spawn_with_handle(async {
        thread::sleep(Duration::from_secs(3));
        3
    })?
    .await;
    Ok(seconds)
}

fn main() -> Result<(), SpawnError> {
    let pool = ThreadPool::new().expect("Unable to create threadpool");

    let a = delay_for(&pool, 3);
    let b = delay_for(&pool, 1);

    let c = executor::block_on(async {
        let (a, b) = future::join(a, b).await;

        Ok(a? + b?)
    });

    println!("{}", c?);
    Ok(())
}

可以看到总时间只有3秒:

You can see that the total time is only 3 seconds:

% time ./target/debug/example
4

real    3.010
user    0.002
sys     0.003

1 — 有一些 讨论当前的实现可能不是阻塞操作的最佳,但现在已经足够了.

1 — There's some discussion that the current implementation may not be the best for blocking operations, but it suffices for now.

这里,我们使用task::spawn_blocking

use futures::future; // 0.3.15
use std::{thread, time::Duration};
use tokio::task; // 1.7.1, features = ["full"]

async fn delay_for(seconds: u64) -> Result<u64, task::JoinError> {
    task::spawn_blocking(move || {
        thread::sleep(Duration::from_secs(seconds));
        seconds
    })
    .await?;
    Ok(seconds)
}

#[tokio::main]
async fn main() -> Result<(), task::JoinError> {
    let a = delay_for(3);
    let b = delay_for(1);

    let (a, b) = future::join(a, b).await;
    let c = a? + b?;

    println!("{}", c);

    Ok(())
}

另见 CPU 密集型任务和 Tokio 文档中的阻止代码.

请注意,这不是一种有效的睡眠方式,它只是一些阻塞操作的占位符.如果您确实需要睡觉,请使用 futures-timertokio::time::sleep.请参阅为什么 Future::select 先选择睡眠时间较长的未来?了解详情

Note that this is not an efficient way of sleeping, it's just a placeholder for some blocking operation. If you actually need to sleep, use something like futures-timer or tokio::time::sleep. See Why does Future::select choose the future with a longer sleep period first? for more details

两种解决方案都不是最优的,也没有充分利用绿色线程模型的优势

neither solution is optimal and don't get the full advantage of the green-threading model

这是正确的 - 因为你没有异步的东西!您正在尝试将两种不同的方法结合起来,并且必须在某处在它们之间进行转换.

That's correct - because you don't have something that is asynchronous! You are trying to combine two different methodologies and there has to be an ugly bit somewhere to translate between them.

第二个不通过reactor框架提供的executor

second don't pass through the executor provided by reactor framework

我不确定你在这里的意思.block_ontokio::main 隐式创建了一个执行器.线程池有一些内部逻辑,可以检查线程是否完成,但这应该只在用户的执行者polls 时触发.

I'm not sure what you mean here. There's an executor implicitly created by block_on or tokio::main. The thread pool has some internal logic that checks to see if a thread is done, but that should only be triggered when the user's executor polls it.

这篇关于在 future-rs 中封装阻塞 I/O 的最佳方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆