如何在 Rust 中迭代返回 Futures 的函数 Vec? [英] How do I iterate over a Vec of functions returning Futures in Rust?

查看:19
本文介绍了如何在 Rust 中迭代返回 Futures 的函数 Vec?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否可以循环一个 Vec,调用一个在每个上返回一个 Future 的方法,并构建一个 Future 链,由消费者(最终)评估?是否执行后面的 Future 将取决于 Vec 中早期 Future 的结果.

澄清一下:

我正在开发一个可以从任意上游源集合中获取数据的应用程序.

请求数据将依次检查每个来源.如果第一个来源有错误 (Err),或者没有可用的数据 (None),那么将尝试第二个来源,依此类推.

每个源都应该只尝试一次,并且在之前的所有源都返回结果之前,不应尝试任何源.错误会被记录下来,否则会被忽略,将查询传递给下一个上游数据源.

我有一些用于获取元数据的工作代码:

///尝试将数据读/写到各种外部源.这些是///嵌套类型,因为数据源可能同时作为读取器和写入器存在结构 StoreManager {///上游数据源阅读器:Vec>>,///下游数据接收器编写器:Vec>>,}为 StoreManager 实现 StoreRead {fn 元数据(self: &Self, id: &Identifier) ->框<未来<选项<元数据>,错误>>{Box::new(ok(self.readers.iter().map(|存储| {执行者::block_on(store.borrow().metadata(id)).unwrap_or_else(|err| {错误!(元数据错误():{:?}",错误);没有任何})}).find(选项::is_some).unwrap_or(无)))}}

除了我对所有 BoxRc/RefCell 的废话不满意之外,我真正关心的是 executor::block_on() 打电话.它阻塞,等待每个 Future 返回结果,然后继续下一个.

既然可以调用fn_returning_future().or_else(|_| other_fn()) 等等,那么是否可以像这样建立一个动态链呢?还是要求在移动到下一个迭代器之前对每个 Future 进行全面评估?

解决方案

你可以使用 stream::unfold 将单个值转换为流.在这种情况下,我们可以使用 IntoIter 迭代器作为单个值.

使用futures::{executor, stream, Stream, TryStreamExt};//0.3.4类型错误 = Box;类型结果<T,E =错误>= std::result::Result;async fn network_request(val:i32) ->结果<i32>{//只是为了演示,不要在实际程序中这样做使用标准::{线,时间::{持续时间,瞬间},};线程::睡眠(持续时间::from_secs(1));println!("在 {:?} 处解析 {}", val, Instant::now());好的(验证 * 100)}fn requests_in_sequence(vals: Vec<i32>) ->impl Stream<Item = Result<i32>>{流::展开(vals.into_iter(),|mut vals|异步{让 val = vals.next()?;让响应 = network_request(val).await;一些((响应,vals))})}fn 主要() {让 s = requests_in_sequence(vec![1, 2, 3]);执行器::block_on(异步{s.try_for_each(|v| 异步移动 {println!("-> {}", v);行(())}).等待.expect("发生错误");});}

Resolving 1 at Instant { tv_sec: 6223328, tv_nsec: 294631597 }->100即时解决 2 { tv_sec: 6223329, tv_nsec: 310839993 }->200立即解决 3 { tv_sec: 6223330, tv_nsec: 311005834 }->300

<小时>

要忽略 ErrNone,您必须将 Error 传送到 Item,使Item 键入 Result, Error>:

使用futures::{executor, stream, Stream, StreamExt};//0.3.4类型错误 = Box;类型结果<T,E =错误>= std::result::Result;async fn network_request(val:i32) ->结果<选项<i32>>{//只是为了演示,不要在实际程序中这样做使用标准::{线,时间::{持续时间,瞬间},};线程::睡眠(持续时间::from_secs(1));println!("在 {:?} 处解析 {}", val, Instant::now());匹配值 {1 =>Err("boom".into()),//一个错误2 =>Ok(None),//没有数据_ =>Ok(Some(val * 100)),//成功}}fn requests_in_sequence(vals: Vec<i32>) ->impl Stream<Item = Result<Option<i32>>>{流::展开(vals.into_iter(),|mut vals|异步{让 val = vals.next()?;让响应 = network_request(val).await;一些((响应,vals))})}fn 主要() {执行器::block_on(异步{让 s = requests_in_sequence(vec![1, 2, 3]);让 s = s.filter_map(|v| 异步移动 { v.ok() });让 s = s.filter_map(|v| 异步移动 { v });让 mut s = s.boxed_local();匹配 s.next().await {一些(v)=>println!("第一次成功:{}", v),无=>println!("没有成功的请求"),}});}

Resolving 1 at Instant { tv_sec: 6224229, tv_nsec: 727216392 }立即解决 2 { tv_sec: 6224230, tv_nsec: 727404752 }即时解析 3 { tv_sec: 6224231, tv_nsec: 727593740 }第一次成功:300

<小时><块引用>

有没有可能建立这样的动态链

是的,通过利用 async 函数:

使用futures::executor;//0.3.4类型错误 = Box;类型结果<T,E =错误>= std::result::Result;async fn network_request(val:i32) ->结果<选项<i32>>{//只是为了演示,不要在实际程序中这样做使用标准::{线,时间::{持续时间,瞬间},};线程::睡眠(持续时间::from_secs(1));println!("在 {:?} 处解析 {}", val, Instant::now());匹配值 {1 =>Err("boom".into()),//一个错误2 =>Ok(None),//没有数据_ =>Ok(Some(val * 100)),//成功}}async fn requests_in_sequence(vals: Vec<i32>) ->结果<i32>{让 mut vals = vals.into_iter().peekable();而让 Some(v) = vals.next() {匹配 network_request(v).await {好的(一些(v))=>返回确定(v),Err(e) if vals.peek().is_none() =>返回错误(e),好的(无) |错误(_)=>{/* 什么都不做,尝试下一个源 */}}}Err("资源耗尽".into())}fn 主要() {执行器::block_on(异步{匹配 requests_in_sequence(vec![1, 2, 3]).await {好的(v) =>println!("第一次成功:{}", v),错误(e)=>println!("没有成功的请求:{}", e),}});}

另见:

<小时><块引用>

在移动到下一个之前,是否需要对迭代器中的每个 Future 进行全面评估

这不是你自己的要求吗?强调我的:

<块引用>

请求数据会依次检查每个来源.如果第一个来源有错误 (Err),或者没有可用的数据 (None),然后会尝试第二个来源

Is it possible to loop over a Vec, calling a method that returns a Future on each, and build a chain of Futures, to be evaluated (eventually) by the consumer? Whether to execute the later Futures would depend on the outcome of the earlier Futures in the Vec.

To clarify:

I'm working on an application that can fetch data from an arbitrary set of upstream sources.

Requesting data would check with each of the sources, in turn. If the first source had an error (Err), or did not have the data available (None), then the second source would be tried, and so on.

Each source should be tried exactly once, and no source should be tried until all of the sources before have returned their results. Errors are logged, but otherwise ignored, passing the query to the next upstream data source.

I have some working code that does this for fetching metadata:

/// Attempts to read/write data to various external sources. These are
/// nested types, because a data source may exist as both a reader and a writer
struct StoreManager {
    /// Upstream data sources
    readers: Vec<Rc<RefCell<StoreRead>>>,
    /// Downstream data sinks
    writers: Vec<Rc<RefCell<StoreWrite>>>,
}

impl StoreRead for StoreManager {
    fn metadata(self: &Self, id: &Identifier) -> Box<Future<Option<Metadata>, Error>> {
       Box::new(ok(self.readers
            .iter()
            .map(|store| {
                executor::block_on(store.borrow().metadata(id)).unwrap_or_else(|err| {
                    error!("Error on metadata(): {:?}", err);
                    None
                })
            })
            .find(Option::is_some)
            .unwrap_or(None)))
    }
}

Aside from my unhappiness with all of the Box and Rc/RefCell nonsense, my real concern is with the executor::block_on() call. It blocks, waiting for each Future to return a result, before continuing to the next.

Given that it's possible to call fn_returning_future().or_else(|_| other_fn()) and so on, is it possible to build up a dynamic chain like this? Or is it a requirement to fully evaluate each Future in the iterator before moving to the next?

解决方案

You can use stream::unfold to convert a single value into a stream. In this case, we can use the IntoIter iterator as that single value.

use futures::{executor, stream, Stream, TryStreamExt}; // 0.3.4

type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn network_request(val: i32) -> Result<i32> {
    // Just for demonstration, don't do this in a real program
    use std::{
        thread,
        time::{Duration, Instant},
    };
    thread::sleep(Duration::from_secs(1));
    println!("Resolving {} at {:?}", val, Instant::now());

    Ok(val * 100)
}

fn requests_in_sequence(vals: Vec<i32>) -> impl Stream<Item = Result<i32>> {
    stream::unfold(vals.into_iter(), |mut vals| async {
        let val = vals.next()?;
        let response = network_request(val).await;
        Some((response, vals))
    })
}

fn main() {
    let s = requests_in_sequence(vec![1, 2, 3]);
    executor::block_on(async {
        s.try_for_each(|v| async move {
            println!("-> {}", v);
            Ok(())
        })
        .await
        .expect("An error occurred");
    });
}

Resolving 1 at Instant { tv_sec: 6223328, tv_nsec: 294631597 }
-> 100
Resolving 2 at Instant { tv_sec: 6223329, tv_nsec: 310839993 }
-> 200
Resolving 3 at Instant { tv_sec: 6223330, tv_nsec: 311005834 }
-> 300


To ignore Err and None, you have to shuttle the Error over to the Item, making the Item type a Result<Option<T>, Error>:

use futures::{executor, stream, Stream, StreamExt}; // 0.3.4

type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn network_request(val: i32) -> Result<Option<i32>> {
    // Just for demonstration, don't do this in a real program
    use std::{
        thread,
        time::{Duration, Instant},
    };
    thread::sleep(Duration::from_secs(1));
    println!("Resolving {} at {:?}", val, Instant::now());

    match val {
        1 => Err("boom".into()),  // An error
        2 => Ok(None),            // No data
        _ => Ok(Some(val * 100)), // Success
    }
}

fn requests_in_sequence(vals: Vec<i32>) -> impl Stream<Item = Result<Option<i32>>> {
    stream::unfold(vals.into_iter(), |mut vals| async {
        let val = vals.next()?;
        let response = network_request(val).await;
        Some((response, vals))
    })
}

fn main() {
    executor::block_on(async {
        let s = requests_in_sequence(vec![1, 2, 3]);

        let s = s.filter_map(|v| async move { v.ok() });
        let s = s.filter_map(|v| async move { v });
        let mut s = s.boxed_local();

        match s.next().await {
            Some(v) => println!("First success: {}", v),
            None => println!("No successful requests"),
        }
    });
}

Resolving 1 at Instant { tv_sec: 6224229, tv_nsec: 727216392 }
Resolving 2 at Instant { tv_sec: 6224230, tv_nsec: 727404752 }
Resolving 3 at Instant { tv_sec: 6224231, tv_nsec: 727593740 }
First success: 300


is it possible to build up a dynamic chain like this

Yes, by leveraging async functions:

use futures::executor; // 0.3.4

type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn network_request(val: i32) -> Result<Option<i32>> {
    // Just for demonstration, don't do this in a real program
    use std::{
        thread,
        time::{Duration, Instant},
    };
    thread::sleep(Duration::from_secs(1));
    println!("Resolving {} at {:?}", val, Instant::now());

    match val {
        1 => Err("boom".into()),  // An error
        2 => Ok(None),            // No data
        _ => Ok(Some(val * 100)), // Success
    }
}

async fn requests_in_sequence(vals: Vec<i32>) -> Result<i32> {
    let mut vals = vals.into_iter().peekable();

    while let Some(v) = vals.next() {
        match network_request(v).await {
            Ok(Some(v)) => return Ok(v),
            Err(e) if vals.peek().is_none() => return Err(e),
            Ok(None) | Err(_) => { /* Do nothing and try the next source */ }
        }
    }

    Err("Ran out of sources".into())
}

fn main() {
    executor::block_on(async {
        match requests_in_sequence(vec![1, 2, 3]).await {
            Ok(v) => println!("First success: {}", v),
            Err(e) => println!("No successful requests: {}", e),
        }
    });
}

See also:


is it a requirement to fully evaluate each Future in the iterator before moving to the next

Isn't that part of your own requirements? Emphasis mine:

Requesting data would check with each of the sources, in turn. If the first source had an error (Err), or did not have the data available (None), then the second source would be tried

这篇关于如何在 Rust 中迭代返回 Futures 的函数 Vec?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆