如何使用reqwest执行并行异步HTTP GET请求? [英] How can I perform parallel asynchronous HTTP GET requests with reqwest?

查看:344
本文介绍了如何使用reqwest执行并行异步HTTP GET请求?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

异步示例很有用,但对它来说是新的Rust和Tokio,我正在努力找出如何立即执行N个请求,使用向量中的URL并为每个URL创建响应HTML的迭代器作为字符串的方式.

The async example is useful, but being new to Rust and Tokio, I am struggling to work out how to do N requests at once, using URLs from a vector, and creating an iterator of the response HTML for each URL as a string.

这怎么办?

推荐答案

自reqwest 0.10开始:

As of reqwest 0.10:

use futures::{stream, StreamExt}; // 0.3.1
use reqwest::Client; // 0.10.0
use tokio; // 0.2.4, features = ["macros"]

const PARALLEL_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org", "https://api.ipify.org"];

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = &client;
            async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            }
        })
        .buffer_unordered(PARALLEL_REQUESTS);

    bodies
        .for_each(|b| {
            async {
                match b {
                    Ok(b) => println!("Got {} bytes", b.len()),
                    Err(e) => eprintln!("Got an error: {}", e),
                }
            }
        })
        .await;
}


stream::iter(urls)

stream::iter

收集字符串并将其转换为 .

Take a collection of strings and convert it into a Stream.

.map(|url| {

StreamExt::map

在流中的每个元素上运行一个异步函数,并将该元素转换为新的类型.

Run an asynchronous function on every element in the stream and transform the element to a new type.

let client = &client;
async move {

Client进行显式引用,然后将引用(不是原始的Client)移到匿名异步块中.

Take an explicit reference to the Client and move the reference (not the original Client) into an anonymous asynchronous block.

let resp = client.get(url).send().await?;

使用Client的连接池启动异步GET请求,然后等待该请求.

Start an asynchronous GET request using the Client's connection pool and wait for the request.

resp.bytes().await

请求并等待响应的字节.

Request and wait for the bytes of the response.

.buffer_unordered(N);

StreamExt::buffer_unordered

将期货流转换为那些期货价值流,并行执行期货.

Convert a stream of futures into a stream of those future's values, executing the futures in parallel.

bodies
    .for_each(|b| {
        async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        }
    })
    .await;

StreamExt::for_each

将流转换回单一的未来,打印出沿途收到的数据量,然后等待未来完成.

Convert the stream back into a single future, printing out the amount of data received along the way, then wait for the future to complete.

另请参阅:

  • Join futures with limited concurrency
  • How to merge iterator of streams?
  • How do I synchronously return a value calculated in an asynchronous Future in stable Rust?
  • What is the difference between `then`, `and_then` and `or_else` in Rust futures?

如果愿意,还可以将迭代器转换为期货迭代器,并使用 future::join_all :

If you wanted to, you could also convert an iterator into an iterator of futures and use future::join_all:

use futures::future; // 0.3.4
use reqwest::Client; // 0.10.1
use tokio; // 0.2.11

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org", "https://api.ipify.org"];

    let bodies = future::join_all(urls.into_iter().map(|url| {
        let client = &client;
        async move {
            let resp = client.get(url).send().await?;
            resp.bytes().await
        }
    }))
    .await;

    for b in bodies {
        match b {
            Ok(b) => println!("Got {} bytes", b.len()),
            Err(e) => eprintln!("Got an error: {}", e),
        }
    }
}

我鼓励使用第一个示例,因为您通常想限制并发性,bufferbuffer_unordered可以帮助您.

I'd encourage using the first example as you usually want to limit the concurrency, which buffer and buffer_unordered help with.

这篇关于如何使用reqwest执行并行异步HTTP GET请求?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆