在future-rs中封装阻塞I/O的最佳方法是什么? [英] What is the best approach to encapsulate blocking I/O in future-rs?

查看:83
本文介绍了在future-rs中封装阻塞I/O的最佳方法是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我阅读了 tokio文档,我想知道什么是最好的封装方法将来成本很高的同步I/O.

I read the tokio documentation and I wonder what is the best approach for encapsulating costly synchronous I/O in a future.

借助反应堆框架,我们获得了绿色线程模型的优势:少数OS线程通过执行程序处理大量并发任务.

With the reactor framework, we get the advantage of a green threading model: a few OS threads handle a lot of concurrent tasks through an executor.

tokio的未来模型是需求驱动的,这意味着未来本身将轮询其内部状态以提供有关其完成的信息;允许背压和抵消功能.据我了解,未来的投票阶段必须畅通无阻,才能运作良好.

The future model of tokio is demand driven, which means the future itself will poll its internal state to provide informations about its completion; allowing backpressure and cancellation capabilities. As far as I understand, the polling phase of the future must be non-blocking to work well.

我想封装的I/O可以看作是耗时长且成本高昂的操作.理想情况下,一个独立的任务将执行I/O,而相关的将来会轮询I/O线程以了解完成状态.

The I/O I want to encapsulate can be seen as a long atomic and costly operation. Ideally, an independent task would perform the I/O and the associated future would poll the I/O thread for the completion status.

我看到的两个唯一选择是:

The two only options I see are:

  • 将阻塞的I/O包括在将来的poll函数中.
  • 生成OS线程以执行I/O,并使用Future机制来轮询其状态,例如
  • Include the blocking I/O in the poll function of the future.
  • spawn an OS thread to perform the I/O and use the future mechanism to poll its state, as shown in the documentation

据我了解,这两种解决方案都不是最佳方案,也无法充分利用绿色线程模型的优势(第一个未在文档中建议,第二个未通过反应堆框架提供的执行器).还有其他解决方案吗?

As I understand it, neither solution is optimal and don't get the full advantage of the green-threading model (first is not advised in documentation and second don't pass through the executor provided by reactor framework). Is there another solution?

推荐答案

理想情况下,独立的任务将执行I/O,而相关的将来会轮询I/O线程以了解完成状态.

Ideally, an independent task would perform the I/O and the associated future would poll the I/O thread for the completion status.

是的,这是异步执行的推荐方法.请注意,这不限于I/O ,但对任何长时间运行的同步任务有效!

Yes, this is the recommended approach for asynchronous execution. Note that this is not restricted to I/O, but is valid for any long-running synchronous task!

创建了 ThreadPool 类型为此 1 .

在这种情况下,您将生成工作以在池中运行.池本身执行工作以检查工作是否已完成,并返回满足Future特征的类型.

In this case, you spawn work to run in the pool. The pool itself performs the work to check to see if the work is completed yet and returns a type that fulfills the Future trait.

use futures::{
    executor::{self, ThreadPool},
    future,
    task::{SpawnError, SpawnExt},
}; // 0.3.1, features = ["thread-pool"]
use std::{thread, time::Duration};

async fn delay_for(pool: &ThreadPool, seconds: u64) -> Result<u64, SpawnError> {
    pool.spawn_with_handle(async {
        thread::sleep(Duration::from_secs(3));
        3
    })?
    .await;
    Ok(seconds)
}

fn main() -> Result<(), SpawnError> {
    let pool = ThreadPool::new().expect("Unable to create threadpool");

    let a = delay_for(&pool, 3);
    let b = delay_for(&pool, 1);

    let c = executor::block_on(async {
        let (a, b) = future::join(a, b).await;

        Ok(a? + b?)
    });

    println!("{}", c?);
    Ok(())
}

您可以看到总时间仅为3秒:

You can see that the total time is only 3 seconds:

% time ./target/debug/example
4

real    3.010
user    0.002
sys     0.003

1 —有一些讨论,当前的实现可能不是阻止操作的最佳,但现在就足够了.

1 — There's some discussion that the current implementation may not be the best for blocking operations, but it suffices for now.

在这里,我们使用task::spawn_blocking

use futures::future; // 0.3.1
use std::{thread, time::Duration};
use tokio::task; // 0.2.9, features = ["full"]

async fn delay_for(seconds: u64) -> Result<u64, task::JoinError> {
    task::spawn_blocking(move || {
        thread::sleep(Duration::from_secs(seconds));
        seconds
    })
    .await?;
    Ok(seconds)
}

#[tokio::main]
async fn main() -> Result<(), task::JoinError> {
    let a = delay_for(3);
    let b = delay_for(1);

    let (a, b) = future::join(a, b).await;
    let c = a? + b?;

    println!("{}", c);

    Ok(())
}

其他要点

请注意,这不是有效的睡眠方式,它只是某些阻止操作的占位符.如果您确实需要睡觉,请使用未来计时器为什么Future :: select首先选择睡眠时间更长的未来?以了解更多详细信息

Additional points

Note that this is not an efficient way of sleeping, it's just a placeholder for some blocking operation. If you actually need to sleep, use something like futures-timer or tokio::time. See Why does Future::select choose the future with a longer sleep period first? for more details

这两种解决方案都不是最佳解决方案,不能充分利用绿色线程模型的优势

neither solution is optimal and don't get the full advantage of the green-threading model

是正确的-因为您没有异步的东西!您正在尝试将两种不同的方法结合起来,并且必须有一个难看的某处在它们之间进行翻译.

That's correct - because you don't have something that is asynchronous! You are trying to combine two different methodologies and there has to be an ugly bit somewhere to translate between them.

第二秒不要通过反应堆框架提供的执行器

second don't pass through the executor provided by reactor framework

我不确定您的意思.有一个由block_ontokio::main隐式创建的执行程序.线程池具有一些内部逻辑,该逻辑检查线程是否完成,但是仅应在用户的执行程序poll将其执行时触发.

I'm not sure what you mean here. There's an executor implicitly created by block_on or tokio::main. The thread pool has some internal logic that checks to see if a thread is done, but that should only be triggered when the user's executor polls it.

这篇关于在future-rs中封装阻塞I/O的最佳方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆