如何在循环中生成异步方法? [英] How can I spawn asynchronous methods in a loop?

查看:66
本文介绍了如何在循环中生成异步方法?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个对象向量,这些对象具有一个 resolve()方法,该方法使用 reqwest 查询外部Web API.在每个对象上调用 resolve()方法后,我想打印每个请求的结果.

I have a vector of objects that have a resolve() method that uses reqwest to query an external web API. After I call the resolve() method on each object, I want to print the result of every request.

这是我的半异步代码,可以编译和运行(但不是真正异步):

Here's my half-asynchronous code that compiles and works (but not really asynchronously):

for mut item in items {
    item.resolve().await;

    item.print_result();
}

我尝试使用 tokio :: join!产生所有异步调用并等待它们完成,但是我可能做错了事:

I've tried to use tokio::join! to spawn all async calls and wait for them to finish, but I'm probably doing something wrong:

tokio::join!(items.iter_mut().for_each(|item| item.resolve()));

这是我遇到的错误:

error[E0308]: mismatched types
  --> src\main.rs:25:51
   |
25 |     tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
   |                                                   ^^^^^^^^^^^^^^ expected `()`, found opaque type
   | 
  ::: src\redirect_definition.rs:32:37
   |
32 |     pub async fn resolve(&mut self) {
   |                                     - the `Output` of this `async fn`'s found opaque type
   |
   = note: expected unit type `()`
            found opaque type `impl std::future::Future`

如何为所有实例一次调用 resolve()方法?

How can I call the resolve() methods for all instances at once?

这段代码反映了答案-现在我正在处理我不太了解的借位检查器错误-我应该用'static 注释我的某些变量吗?

This code reflects the answer - now I'm dealing with borrow checker errors that I don't really understand - should I annotate some of my variables with 'static?

let mut items = get_from_csv(path);

let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();

for task in tasks {
    task.await;
}

for item in items {
    item.print_result();
}

error[E0597]: `items` does not live long enough
  --> src\main.rs:18:25
   |
18 |       let tasks: Vec<_> = items
   |                           -^^^^
   |                           |
   |  _________________________borrowed value does not live long enough
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
31 |   }
   |   - `items` dropped here while still borrowed

error[E0505]: cannot move out of `items` because it is borrowed
  --> src\main.rs:27:17
   |
18 |       let tasks: Vec<_> = items
   |                           -----
   |                           |
   |  _________________________borrow of `items` occurs here
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
27 |       for item in items {
   |                   ^^^^^ move out of `items` occurs here

推荐答案

由于您希望并行等待期货,因此可以

Since you want to await on the futures in parallel, you can spawn them into individual tasks that run in parallel. Since they run independently of each other and of the thread that spawned them, you can await their handles in any order.

理想情况下,您会这样写:

Ideally you'd write something like this:

// spawn tasks that run in parallel
let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();
// now await them to get the resolve's to complete
for task in tasks {
    task.await.unwrap();
}
// and we're done
for item in &items {
    item.print_result();
}

但是这将被借用检查器拒绝,因为 item.resolve()返回的Future拥有对 item 的借用引用.引用传递给 tokio :: spawn(),后者将其传递给另一个线程,并且编译器无法证明 item 会超过该线程的寿命.(当您要将对本地数据的引用发送到线程时,会遇到相同的问题.)

But this will be rejected by the borrow checker because the future returned by item.resolve() holds a borrowed reference to item. The reference is passed to tokio::spawn() which hands it off to another thread, and the compiler cannot prove that item will outlive that thread. (The same kind of problem is encountered when you want to send reference to local data to a thread.)

对此有几种可能的解决方案;我发现最优雅的方法是将项目移动到传递给 tokio :: spawn()的异步闭包中,并在完成后将任务交还给您.基本上,您使用 items 向量来创建任务,并立即从等待的结果中重新构造它:

There are several possible solutions to this; the one I find most elegant is to move items into the async closure passed to tokio::spawn(), and have the task hand them back to you once it's done. Basically you consume the items vector to create the tasks and immediately reconstitute it from the awaited results:

// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
    .into_iter()
    .map(|mut item| {
        tokio::spawn(async {
            item.resolve().await;
            item
        })
    })
    .collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
    items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
    item.print_result();
}

游乐场.

请注意, futures 板条箱包含

Note that the futures crate contains a join_all function which is similar to what you need, except it polls the individual futures without ensuring that they run in parallel. We can write a generic join_parallel that uses join_all, but also uses tokio::spawn to get parallel execution:

async fn join_parallel<T: Send + 'static>(
    futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
    let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
    // unwrap the Result because it is introduced by tokio::spawn()
    // and isn't something our caller can handle
    futures::future::join_all(tasks)
        .await
        .into_iter()
        .map(Result::unwrap)
        .collect()
}

使用此功能,回答问题所需的代码简化为:

Using this function the code needed to answer the question boils down to just:

let items = join_parallel(items.into_iter().map(|mut item| async {
    item.resolve().await;
    item
})).await;
for item in &items {
    item.print_result();
}

.

Again, runnable code in the playground.

这篇关于如何在循环中生成异步方法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆